mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-21 16:03:32 +01:00
1975 lines
56 KiB
Go
1975 lines
56 KiB
Go
package zfs
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"os"
|
|
"os/exec"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/zrepl/zrepl/internal/util/circlog"
|
|
"github.com/zrepl/zrepl/internal/util/envconst"
|
|
"github.com/zrepl/zrepl/internal/util/nodefault"
|
|
zfsprop "github.com/zrepl/zrepl/internal/zfs/property"
|
|
"github.com/zrepl/zrepl/internal/zfs/zfscmd"
|
|
)
|
|
|
|
type DatasetPath struct {
|
|
comps []string
|
|
}
|
|
|
|
func (p *DatasetPath) ToString() string {
|
|
return strings.Join(p.comps, "/")
|
|
}
|
|
|
|
func (p *DatasetPath) Empty() bool {
|
|
return len(p.comps) == 0
|
|
}
|
|
|
|
func (p *DatasetPath) Extend(extend *DatasetPath) {
|
|
p.comps = append(p.comps, extend.comps...)
|
|
}
|
|
|
|
func (p *DatasetPath) HasPrefix(prefix *DatasetPath) bool {
|
|
if len(prefix.comps) > len(p.comps) {
|
|
return false
|
|
}
|
|
for i := range prefix.comps {
|
|
if prefix.comps[i] != p.comps[i] {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (p *DatasetPath) TrimPrefix(prefix *DatasetPath) {
|
|
if !p.HasPrefix(prefix) {
|
|
return
|
|
}
|
|
prelen := len(prefix.comps)
|
|
newlen := len(p.comps) - prelen
|
|
oldcomps := p.comps
|
|
p.comps = make([]string, newlen)
|
|
for i := 0; i < newlen; i++ {
|
|
p.comps[i] = oldcomps[prelen+i]
|
|
}
|
|
}
|
|
|
|
func (p *DatasetPath) TrimNPrefixComps(n int) {
|
|
if len(p.comps) < n {
|
|
n = len(p.comps)
|
|
}
|
|
if n == 0 {
|
|
return
|
|
}
|
|
p.comps = p.comps[n:]
|
|
|
|
}
|
|
|
|
func (p DatasetPath) Equal(q *DatasetPath) bool {
|
|
if len(p.comps) != len(q.comps) {
|
|
return false
|
|
}
|
|
for i := range p.comps {
|
|
if p.comps[i] != q.comps[i] {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (p *DatasetPath) Length() int {
|
|
return len(p.comps)
|
|
}
|
|
|
|
func (p *DatasetPath) Copy() (c *DatasetPath) {
|
|
c = &DatasetPath{}
|
|
c.comps = make([]string, len(p.comps))
|
|
copy(c.comps, p.comps)
|
|
return
|
|
}
|
|
|
|
func (p *DatasetPath) MarshalJSON() ([]byte, error) {
|
|
return json.Marshal(p.comps)
|
|
}
|
|
|
|
func (p *DatasetPath) UnmarshalJSON(b []byte) error {
|
|
p.comps = make([]string, 0)
|
|
return json.Unmarshal(b, &p.comps)
|
|
}
|
|
|
|
func (p *DatasetPath) Pool() (string, error) {
|
|
if len(p.comps) < 1 {
|
|
return "", fmt.Errorf("dataset path does not have a pool component")
|
|
}
|
|
return p.comps[0], nil
|
|
|
|
}
|
|
|
|
func NewDatasetPath(s string) (p *DatasetPath, err error) {
|
|
p = &DatasetPath{}
|
|
if s == "" {
|
|
p.comps = make([]string, 0)
|
|
return p, nil // the empty dataset path
|
|
}
|
|
const FORBIDDEN = "@#|\t<>*"
|
|
/* Documentation of allowed characters in zfs names:
|
|
https://docs.oracle.com/cd/E19253-01/819-5461/gbcpt/index.html
|
|
Space is missing in the oracle list, but according to
|
|
https://github.com/zfsonlinux/zfs/issues/439
|
|
there is evidence that it was intentionally allowed
|
|
*/
|
|
if strings.ContainsAny(s, FORBIDDEN) {
|
|
err = fmt.Errorf("contains forbidden characters (any of '%s')", FORBIDDEN)
|
|
return
|
|
}
|
|
p.comps = strings.Split(s, "/")
|
|
if p.comps[len(p.comps)-1] == "" {
|
|
err = fmt.Errorf("must not end with a '/'")
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func toDatasetPath(s string) *DatasetPath {
|
|
p, err := NewDatasetPath(s)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return p
|
|
}
|
|
|
|
type ZFSError struct {
|
|
Stderr []byte
|
|
WaitErr error
|
|
}
|
|
|
|
func (e *ZFSError) Error() string {
|
|
return fmt.Sprintf("zfs exited with error: %s\nstderr:\n%s", e.WaitErr.Error(), e.Stderr)
|
|
}
|
|
|
|
var ZFS_BINARY string = "zfs"
|
|
|
|
func ZFSList(ctx context.Context, properties []string, zfsArgs ...string) (res [][]string, err error) {
|
|
|
|
args := make([]string, 0, 4+len(zfsArgs))
|
|
args = append(args,
|
|
"list", "-H", "-p",
|
|
"-o", strings.Join(properties, ","))
|
|
args = append(args, zfsArgs...)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
|
|
stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if err = cmd.Start(); err != nil {
|
|
return
|
|
}
|
|
// in case we return early, we want to kill the zfs list process and wait for it to exit
|
|
defer func() {
|
|
_ = cmd.Wait()
|
|
}()
|
|
defer cancel()
|
|
|
|
s := bufio.NewScanner(stdout)
|
|
buf := make([]byte, 1024)
|
|
s.Buffer(buf, 0)
|
|
|
|
res = make([][]string, 0)
|
|
|
|
for s.Scan() {
|
|
fields := strings.SplitN(s.Text(), "\t", len(properties))
|
|
|
|
if len(fields) != len(properties) {
|
|
err = errors.New("unexpected output")
|
|
return
|
|
}
|
|
|
|
res = append(res, fields)
|
|
}
|
|
|
|
if waitErr := cmd.Wait(); waitErr != nil {
|
|
err := &ZFSError{
|
|
Stderr: stderrBuf.Bytes(),
|
|
WaitErr: waitErr,
|
|
}
|
|
return nil, err
|
|
}
|
|
return
|
|
}
|
|
|
|
type ZFSListResult struct {
|
|
Fields []string
|
|
Err error
|
|
}
|
|
|
|
// ZFSListChan executes `zfs list` and sends the results to the `out` channel.
|
|
// The `out` channel is always closed by ZFSListChan:
|
|
// If an error occurs, it is closed after sending a result with the Err field set.
|
|
// If no error occurs, it is just closed.
|
|
// If the operation is cancelled via context, the channel is just closed.
|
|
//
|
|
// If notExistHint is not nil and zfs exits with an error,
|
|
// the stderr is attempted to be interpreted as a *DatasetDoesNotExist error.
|
|
//
|
|
// However, if callers do not drain `out` or cancel via `ctx`, the process will leak either running because
|
|
// IO is pending or as a zombie.
|
|
func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []string, notExistHint *DatasetPath, zfsArgs ...string) {
|
|
defer close(out)
|
|
|
|
args := make([]string, 0, 4+len(zfsArgs))
|
|
args = append(args,
|
|
"list", "-H", "-p",
|
|
"-o", strings.Join(properties, ","))
|
|
args = append(args, zfsArgs...)
|
|
|
|
sendResult := func(fields []string, err error) (done bool) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return true
|
|
case out <- ZFSListResult{fields, err}:
|
|
return false
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
|
|
stdout, stderrBuf, err := cmd.StdoutPipeWithErrorBuf()
|
|
if err != nil {
|
|
sendResult(nil, err)
|
|
return
|
|
}
|
|
if err = cmd.Start(); err != nil {
|
|
sendResult(nil, err)
|
|
return
|
|
}
|
|
defer func() {
|
|
// discard the error, this defer is only relevant if we return while parsing the output
|
|
// in which case we'll return an 'unexpected output' error and not the exit status
|
|
_ = cmd.Wait()
|
|
}()
|
|
defer cancel() // in case we return before our regular call to cmd.Wait(), kill the zfs list process
|
|
|
|
s := bufio.NewScanner(stdout)
|
|
buf := make([]byte, 1024) // max line length
|
|
s.Buffer(buf, 0)
|
|
|
|
for s.Scan() {
|
|
fields := strings.SplitN(s.Text(), "\t", len(properties))
|
|
if len(fields) != len(properties) {
|
|
sendResult(nil, errors.New("unexpected output"))
|
|
return
|
|
}
|
|
if sendResult(fields, nil) {
|
|
return
|
|
}
|
|
}
|
|
if err := cmd.Wait(); err != nil {
|
|
if _, ok := err.(*exec.ExitError); ok {
|
|
var enotexist *DatasetDoesNotExist
|
|
if notExistHint != nil {
|
|
enotexist = tryDatasetDoesNotExist(notExistHint.ToString(), stderrBuf.Bytes())
|
|
}
|
|
if enotexist != nil {
|
|
sendResult(nil, enotexist)
|
|
} else {
|
|
sendResult(nil, &ZFSError{
|
|
Stderr: stderrBuf.Bytes(),
|
|
WaitErr: err,
|
|
})
|
|
}
|
|
} else {
|
|
sendResult(nil, &ZFSError{WaitErr: err})
|
|
}
|
|
return
|
|
}
|
|
if s.Err() != nil {
|
|
sendResult(nil, s.Err())
|
|
return
|
|
}
|
|
}
|
|
|
|
// FIXME replace with EntityNamecheck
|
|
func validateZFSFilesystem(fs string) error {
|
|
if len(fs) < 1 {
|
|
return errors.New("filesystem path must have length > 0")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// v must not be nil and be already validated
|
|
func absVersion(fs string, v *ZFSSendArgVersion) (full string, err error) {
|
|
if err := validateZFSFilesystem(fs); err != nil {
|
|
return "", err
|
|
}
|
|
return fmt.Sprintf("%s%s", fs, v.RelName), nil
|
|
}
|
|
|
|
func pipeWithCapacityHint(capacity int) (r, w *os.File, err error) {
|
|
if capacity < 0 {
|
|
panic(fmt.Sprintf("capacity must be non-negative, got %v", capacity))
|
|
}
|
|
stdoutReader, stdoutWriter, err := os.Pipe()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
trySetPipeCapacity(stdoutWriter, capacity)
|
|
return stdoutReader, stdoutWriter, nil
|
|
}
|
|
|
|
type sendStreamState int
|
|
|
|
const (
|
|
sendStreamOpen sendStreamState = iota
|
|
sendStreamClosed
|
|
)
|
|
|
|
type SendStream struct {
|
|
cmd *zfscmd.Cmd
|
|
stdoutReader io.ReadCloser // not *os.File for mocking during platformtest
|
|
stderrBuf *circlog.CircularLog
|
|
|
|
mtx sync.Mutex
|
|
state sendStreamState
|
|
exitErr *ZFSError
|
|
}
|
|
|
|
func (s *SendStream) Read(p []byte) (n int, _ error) {
|
|
s.mtx.Lock()
|
|
defer s.mtx.Unlock()
|
|
|
|
switch s.state {
|
|
case sendStreamClosed:
|
|
return 0, os.ErrClosed
|
|
|
|
case sendStreamOpen:
|
|
n, readErr := s.stdoutReader.Read(p)
|
|
if readErr != nil {
|
|
debug("sendStream: read: readErr=%T %s", readErr, readErr)
|
|
if readErr == io.EOF {
|
|
// io.EOF must be bubbled up as is so that consumers can handle it properly.
|
|
return n, readErr
|
|
}
|
|
// Assume that the error is not retryable.
|
|
// Try to kill now so that we can return a nice *ZFSError with captured stderr.
|
|
// If the kill doesn't work, it doesn't matter because the caller must by contract call Close() anyways.
|
|
killErr := s.killAndWait()
|
|
debug("sendStream: read: killErr=%T %s", killErr, killErr)
|
|
if killErr == nil {
|
|
s.state = sendStreamClosed
|
|
return n, s.exitErr // return the nice error
|
|
} else {
|
|
// we remain open so that we retry
|
|
return n, readErr // return the normal error
|
|
}
|
|
}
|
|
return n, readErr
|
|
|
|
default:
|
|
panic("unreachable")
|
|
}
|
|
}
|
|
|
|
func (s *SendStream) Close() error {
|
|
debug("sendStream: close called")
|
|
s.mtx.Lock()
|
|
defer s.mtx.Unlock()
|
|
|
|
switch s.state {
|
|
case sendStreamOpen:
|
|
err := s.killAndWait()
|
|
if err != nil {
|
|
return err
|
|
} else {
|
|
s.state = sendStreamClosed
|
|
return nil
|
|
}
|
|
case sendStreamClosed:
|
|
return os.ErrClosed
|
|
default:
|
|
panic("unreachable")
|
|
}
|
|
}
|
|
|
|
// returns nil iff the child process is gone (has been successfully waited upon)
|
|
// in that case, s.exitErr is set
|
|
func (s *SendStream) killAndWait() error {
|
|
|
|
debug("sendStream: killAndWait enter")
|
|
defer debug("sendStream: killAndWait leave")
|
|
|
|
// ensure this function is only called once
|
|
if s.state != sendStreamOpen {
|
|
panic(s.state)
|
|
}
|
|
|
|
// send SIGKILL
|
|
// In an earlier version, we used the starting context.Context's cancel function
|
|
// for this, but in Go > 1.19, doing so will cause .Wait() to return the
|
|
// context cancel error instead of the *exec.ExitError.
|
|
err := s.cmd.Process().Kill()
|
|
if err != nil {
|
|
if err == os.ErrProcessDone {
|
|
// This can happen if
|
|
// (1) the process has already been .Wait()ed, or
|
|
// (2) some other goroutine cancels the context, likely further up
|
|
// the context tree.
|
|
// Case (1) can't happen to us because we only call
|
|
// this function in sendStreamOpen state.
|
|
// In Case (2), it's still our job to .Wait(), so, fallthrough.
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Close our read-end of the pipe.
|
|
//
|
|
// We must do this before .Wait() because in some (not all) versions/build configs of ZFS,
|
|
// `zfs send` uses a separate kernel thread (taskq) to write the send stream (function `dump_bytes`).
|
|
// The `zfs send` thread then waits uinterruptably for the taskq thread to finish the write.
|
|
// And signalling the `zfs send` thread doesn't propagate to the taskq thread.
|
|
// So we end up in a state where we .Wait() forever.
|
|
// (See https://github.com/openzfs/zfs/issues/12500 and
|
|
// https://github.com/zrepl/zrepl/issues/495#issuecomment-902530043)
|
|
//
|
|
// By closing our read end of the pipe before .Wait(), we unblock the taskq thread if there is any.
|
|
// If there is no separate taskq thread, the SIGKILL to `zfs end` would suffice and be most precise,
|
|
// but due to the circumstances above, there is no other portable & robust way.
|
|
//
|
|
// However, the fallout from closing the pipe is that (in non-taskq builds) `zfs sends` will get a SIGPIPE.
|
|
// And on Linux, that SIGPIPE appears to win over the previously issued SIGKILL.
|
|
// And thus, on Linux, the `zfs send` will be killed by the default SIGPIPE handler.
|
|
// We can observe this in the WaitStatus below.
|
|
// This behavior is slightly annoying because the *exec.ExitError's message ("signal: broken pipe")
|
|
// isn't as clear as ("signal: killed").
|
|
// However, it seems like we just have to live with that. (covered by platformtest)
|
|
var closePipeErr error
|
|
if s.stdoutReader != nil {
|
|
closePipeErr = s.stdoutReader.Close()
|
|
if closePipeErr == nil {
|
|
// avoid double-closes in case waiting below doesn't work
|
|
// and someone attempts Close again
|
|
s.stdoutReader = nil
|
|
} else {
|
|
return closePipeErr
|
|
}
|
|
}
|
|
|
|
waitErr := s.cmd.Wait()
|
|
// distinguish between ExitError (which is actually a non-problem for us)
|
|
// vs failed wait syscall (for which we give upper layers the chance to retyr)
|
|
var exitErr *exec.ExitError
|
|
if waitErr != nil {
|
|
if ee, ok := waitErr.(*exec.ExitError); ok {
|
|
exitErr = ee
|
|
} else {
|
|
return waitErr
|
|
}
|
|
}
|
|
|
|
// invariant: at this point, the child is gone and we cleaned up everything related to the SendStream
|
|
|
|
if exitErr != nil {
|
|
// zfs send exited with an error or was killed by a signal.
|
|
s.exitErr = &ZFSError{
|
|
Stderr: []byte(s.stderrBuf.String()),
|
|
WaitErr: exitErr,
|
|
}
|
|
} else {
|
|
// zfs send exited successfully (we know that since waitErr was either nil or wasn't an *exec.ExitError)
|
|
s.exitErr = nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *SendStream) TestOnly_ReplaceStdoutReader(f io.ReadCloser) (prev io.ReadCloser) {
|
|
prev = s.stdoutReader
|
|
s.stdoutReader = f
|
|
return prev
|
|
}
|
|
|
|
func (s *SendStream) TestOnly_ExitErr() *ZFSError { return s.exitErr }
|
|
|
|
// NOTE: When updating this struct, make sure to update funcs Validate ValidateCorrespondsToResumeToken
|
|
type ZFSSendArgVersion struct {
|
|
RelName string
|
|
GUID uint64
|
|
}
|
|
|
|
func (v ZFSSendArgVersion) GetGuid() uint64 { return v.GUID }
|
|
func (v ZFSSendArgVersion) ToSendArgVersion() ZFSSendArgVersion { return v }
|
|
|
|
func (v ZFSSendArgVersion) ValidateInMemory(fs string) error {
|
|
if fs == "" {
|
|
panic(fs)
|
|
}
|
|
if len(v.RelName) == 0 {
|
|
return errors.New("`RelName` must not be empty")
|
|
}
|
|
|
|
var et EntityType
|
|
switch v.RelName[0] {
|
|
case '@':
|
|
et = EntityTypeSnapshot
|
|
case '#':
|
|
et = EntityTypeBookmark
|
|
default:
|
|
return fmt.Errorf("`RelName` field must start with @ or #, got %q", v.RelName)
|
|
}
|
|
|
|
full := v.fullPathUnchecked(fs)
|
|
if err := EntityNamecheck(full, et); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (v ZFSSendArgVersion) mustValidateInMemory(fs string) {
|
|
if err := v.ValidateInMemory(fs); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// fs must be not empty
|
|
func (a ZFSSendArgVersion) ValidateExistsAndGetVersion(ctx context.Context, fs string) (v FilesystemVersion, _ error) {
|
|
|
|
if err := a.ValidateInMemory(fs); err != nil {
|
|
return v, nil
|
|
}
|
|
|
|
realVersion, err := ZFSGetFilesystemVersion(ctx, a.FullPath(fs))
|
|
if err != nil {
|
|
return v, err
|
|
}
|
|
|
|
if realVersion.Guid != a.GUID {
|
|
return v, fmt.Errorf("`GUID` field does not match real dataset's GUID: %q != %q", realVersion.Guid, a.GUID)
|
|
}
|
|
|
|
return realVersion, nil
|
|
}
|
|
|
|
func (a ZFSSendArgVersion) ValidateExists(ctx context.Context, fs string) error {
|
|
_, err := a.ValidateExistsAndGetVersion(ctx, fs)
|
|
return err
|
|
}
|
|
|
|
func (v ZFSSendArgVersion) FullPath(fs string) string {
|
|
v.mustValidateInMemory(fs)
|
|
return v.fullPathUnchecked(fs)
|
|
}
|
|
|
|
func (v ZFSSendArgVersion) fullPathUnchecked(fs string) string {
|
|
return fmt.Sprintf("%s%s", fs, v.RelName)
|
|
}
|
|
|
|
func (v ZFSSendArgVersion) IsSnapshot() bool {
|
|
v.mustValidateInMemory("unimportant")
|
|
return v.RelName[0] == '@'
|
|
}
|
|
|
|
func (v ZFSSendArgVersion) MustBeBookmark() {
|
|
v.mustValidateInMemory("unimportant")
|
|
if v.RelName[0] != '#' {
|
|
panic(fmt.Sprintf("must be bookmark, got %q", v.RelName))
|
|
}
|
|
}
|
|
|
|
// When updating this struct, check Validate and ValidateCorrespondsToResumeToken (POTENTIALLY SECURITY SENSITIVE)
|
|
type ZFSSendArgsUnvalidated struct {
|
|
FS string
|
|
From, To *ZFSSendArgVersion // From may be nil
|
|
ZFSSendFlags
|
|
}
|
|
|
|
type ZFSSendArgsValidated struct {
|
|
ZFSSendArgsUnvalidated
|
|
FromVersion *FilesystemVersion
|
|
ToVersion FilesystemVersion
|
|
}
|
|
|
|
type ZFSSendFlags struct {
|
|
Encrypted *nodefault.Bool
|
|
Properties bool
|
|
BackupProperties bool
|
|
Raw bool
|
|
LargeBlocks bool
|
|
Compressed bool
|
|
EmbeddedData bool
|
|
Saved bool
|
|
|
|
// Preferred if not empty
|
|
ResumeToken string // if not nil, must match what is specified in From, To (covered by ValidateCorrespondsToResumeToken)
|
|
}
|
|
|
|
type zfsSendArgsValidationContext struct {
|
|
encEnabled *nodefault.Bool
|
|
}
|
|
|
|
type ZFSSendArgsValidationErrorCode int
|
|
|
|
const (
|
|
ZFSSendArgsGenericValidationError ZFSSendArgsValidationErrorCode = 1 + iota
|
|
ZFSSendArgsEncryptedSendRequestedButFSUnencrypted
|
|
ZFSSendArgsFSEncryptionCheckFail
|
|
ZFSSendArgsResumeTokenMismatch
|
|
)
|
|
|
|
type ZFSSendArgsValidationError struct {
|
|
Args ZFSSendArgsUnvalidated
|
|
What ZFSSendArgsValidationErrorCode
|
|
Msg error
|
|
}
|
|
|
|
func newValidationError(sendArgs ZFSSendArgsUnvalidated, what ZFSSendArgsValidationErrorCode, cause error) *ZFSSendArgsValidationError {
|
|
return &ZFSSendArgsValidationError{sendArgs, what, cause}
|
|
}
|
|
|
|
func newGenericValidationError(sendArgs ZFSSendArgsUnvalidated, cause error) *ZFSSendArgsValidationError {
|
|
return &ZFSSendArgsValidationError{sendArgs, ZFSSendArgsGenericValidationError, cause}
|
|
}
|
|
|
|
func (e ZFSSendArgsValidationError) Error() string {
|
|
return e.Msg.Error()
|
|
}
|
|
|
|
type zfsSendArgsSkipValidationKeyType struct{}
|
|
|
|
var zfsSendArgsSkipValidationKey = zfsSendArgsSkipValidationKeyType{}
|
|
|
|
func ZFSSendArgsSkipValidation(ctx context.Context) context.Context {
|
|
return context.WithValue(ctx, zfsSendArgsSkipValidationKey, true)
|
|
}
|
|
|
|
// - Recursively call Validate on each field.
|
|
// - Make sure that if ResumeToken != "", it reflects the same operation as the other parameters would.
|
|
//
|
|
// This function is not pure because GUIDs are checked against the local host's datasets.
|
|
func (a ZFSSendArgsUnvalidated) Validate(ctx context.Context) (v ZFSSendArgsValidated, _ error) {
|
|
if dp, err := NewDatasetPath(a.FS); err != nil || dp.Length() == 0 {
|
|
return v, newGenericValidationError(a, fmt.Errorf("`FS` must be a valid non-zero dataset path"))
|
|
}
|
|
|
|
if a.To == nil {
|
|
return v, newGenericValidationError(a, fmt.Errorf("`To` must not be nil"))
|
|
}
|
|
toVersion, err := a.To.ValidateExistsAndGetVersion(ctx, a.FS)
|
|
if err != nil {
|
|
return v, newGenericValidationError(a, errors.Wrap(err, "`To` invalid"))
|
|
}
|
|
|
|
var fromVersion *FilesystemVersion
|
|
if a.From != nil {
|
|
fromV, err := a.From.ValidateExistsAndGetVersion(ctx, a.FS)
|
|
if err != nil {
|
|
return v, newGenericValidationError(a, errors.Wrap(err, "`From` invalid"))
|
|
}
|
|
fromVersion = &fromV
|
|
// fallthrough
|
|
}
|
|
|
|
validated := ZFSSendArgsValidated{
|
|
ZFSSendArgsUnvalidated: a,
|
|
FromVersion: fromVersion,
|
|
ToVersion: toVersion,
|
|
}
|
|
|
|
if ctx.Value(zfsSendArgsSkipValidationKey) != nil {
|
|
return validated, nil
|
|
}
|
|
|
|
if err := a.ZFSSendFlags.Validate(); err != nil {
|
|
return v, newGenericValidationError(a, errors.Wrap(err, "send flags invalid"))
|
|
}
|
|
|
|
valCtx := &zfsSendArgsValidationContext{}
|
|
fsEncrypted, err := ZFSGetEncryptionEnabled(ctx, a.FS)
|
|
if err != nil {
|
|
return v, newValidationError(a, ZFSSendArgsFSEncryptionCheckFail,
|
|
errors.Wrapf(err, "cannot check whether filesystem %q is encrypted", a.FS))
|
|
}
|
|
valCtx.encEnabled = &nodefault.Bool{B: fsEncrypted}
|
|
|
|
if a.Encrypted.B && !fsEncrypted {
|
|
return v, newValidationError(a, ZFSSendArgsEncryptedSendRequestedButFSUnencrypted,
|
|
errors.Errorf("encrypted send mandated by policy, but filesystem %q is not encrypted", a.FS))
|
|
}
|
|
|
|
if a.Raw && fsEncrypted && !a.Encrypted.B {
|
|
return v, newValidationError(a, ZFSSendArgsGenericValidationError,
|
|
errors.Errorf("policy mandates raw+unencrypted sends, but filesystem %q is encrypted", a.FS))
|
|
}
|
|
|
|
if err := a.validateEncryptionFlagsCorrespondToResumeToken(ctx, valCtx); err != nil {
|
|
return v, newValidationError(a, ZFSSendArgsResumeTokenMismatch, err)
|
|
}
|
|
|
|
return validated, nil
|
|
}
|
|
|
|
func (f ZFSSendFlags) Validate() error {
|
|
if err := f.Encrypted.ValidateNoDefault(); err != nil {
|
|
return errors.Wrap(err, "flag `Encrypted` invalid")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// If ResumeToken is empty, builds a command line with the flags specified.
|
|
// If ResumeToken is not empty, build a command line with just `-t {{.ResumeToken}}`.
|
|
//
|
|
// SECURITY SENSITIVE it is the caller's responsibility to ensure that a.Encrypted semantics
|
|
// hold for the file system that will be sent with the send flags returned by this function
|
|
func (a ZFSSendFlags) buildSendFlagsUnchecked() []string {
|
|
|
|
args := make([]string, 0)
|
|
|
|
// ResumeToken takes precedence, we assume that it has been validated
|
|
// to reflect what is described by the other fields.
|
|
if a.ResumeToken != "" {
|
|
args = append(args, "-t", a.ResumeToken)
|
|
return args
|
|
}
|
|
|
|
if a.Encrypted.B || a.Raw {
|
|
args = append(args, "-w")
|
|
}
|
|
|
|
if a.Properties {
|
|
args = append(args, "-p")
|
|
}
|
|
|
|
if a.BackupProperties {
|
|
args = append(args, "-b")
|
|
}
|
|
|
|
if a.LargeBlocks {
|
|
args = append(args, "-L")
|
|
}
|
|
|
|
if a.Compressed {
|
|
args = append(args, "-c")
|
|
}
|
|
|
|
if a.EmbeddedData {
|
|
args = append(args, "-e")
|
|
}
|
|
|
|
if a.Saved {
|
|
args = append(args, "-S")
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
func (a ZFSSendArgsValidated) buildSendCommandLine() ([]string, error) {
|
|
|
|
flags := a.buildSendFlagsUnchecked()
|
|
|
|
if a.ZFSSendFlags.ResumeToken != "" {
|
|
return flags, nil
|
|
}
|
|
|
|
toV, err := absVersion(a.FS, a.To)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fromV := ""
|
|
if a.From != nil {
|
|
fromV, err = absVersion(a.FS, a.From)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if fromV == "" { // Initial
|
|
flags = append(flags, toV)
|
|
} else {
|
|
flags = append(flags, "-i", fromV, toV)
|
|
}
|
|
return flags, nil
|
|
}
|
|
|
|
type ZFSSendArgsResumeTokenMismatchError struct {
|
|
What ZFSSendArgsResumeTokenMismatchErrorCode
|
|
Err error
|
|
}
|
|
|
|
func (e *ZFSSendArgsResumeTokenMismatchError) Error() string { return e.Err.Error() }
|
|
|
|
type ZFSSendArgsResumeTokenMismatchErrorCode int
|
|
|
|
// The format is ZFSSendArgsResumeTokenMismatch+WhatIsWrongInToken
|
|
const (
|
|
ZFSSendArgsResumeTokenMismatchGeneric ZFSSendArgsResumeTokenMismatchErrorCode = 1 + iota
|
|
ZFSSendArgsResumeTokenMismatchEncryptionNotSet // encryption not set in token but required by send args
|
|
ZFSSendArgsResumeTokenMismatchEncryptionSet // encryption not set in token but not required by send args
|
|
ZFSSendArgsResumeTokenMismatchFilesystem
|
|
)
|
|
|
|
func (c ZFSSendArgsResumeTokenMismatchErrorCode) fmt(format string, args ...interface{}) *ZFSSendArgsResumeTokenMismatchError {
|
|
return &ZFSSendArgsResumeTokenMismatchError{
|
|
What: c,
|
|
Err: fmt.Errorf(format, args...),
|
|
}
|
|
}
|
|
|
|
// Validate that the encryption settings specified in `a` correspond to the encryption settings encoded in the resume token.
|
|
//
|
|
// This is SECURITY SENSITIVE:
|
|
// It is possible for an attacker to craft arbitrary resume tokens.
|
|
// Those malicious resume tokens could encode different parameters in the resume token than expected:
|
|
// for example, they may specify another file system (e.g. the filesystem with secret data) or request unencrypted send instead of encrypted raw send.
|
|
//
|
|
// Note that we don't check correspondence of all other send flags because
|
|
// a) the resume token does not capture all send flags (e.g. send -p is implemented in libzfs and thus not represented in the resume token)
|
|
// b) it would force us to either reject resume tokens with unknown flags.
|
|
func (a ZFSSendArgsUnvalidated) validateEncryptionFlagsCorrespondToResumeToken(ctx context.Context, valCtx *zfsSendArgsValidationContext) error {
|
|
|
|
if a.ResumeToken == "" {
|
|
return nil // nothing to do
|
|
}
|
|
|
|
debug("decoding resume token %q", a.ResumeToken)
|
|
t, err := ParseResumeToken(ctx, a.ResumeToken)
|
|
debug("decode resume token result: %#v %T %v", t, err, err)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tokenFS, _, err := t.ToNameSplit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
gen := ZFSSendArgsResumeTokenMismatchGeneric
|
|
|
|
if a.FS != tokenFS.ToString() {
|
|
return ZFSSendArgsResumeTokenMismatchFilesystem.fmt(
|
|
"filesystem in resume token field `toname` = %q does not match expected value %q", tokenFS.ToString(), a.FS)
|
|
}
|
|
|
|
// If From is set, it must match.
|
|
if (a.From != nil) != t.HasFromGUID { // existence must be same
|
|
if t.HasFromGUID {
|
|
return gen.fmt("resume token not expected to be incremental, but `fromguid` = %v", t.FromGUID)
|
|
} else {
|
|
return gen.fmt("resume token expected to be incremental, but `fromguid` not present")
|
|
}
|
|
} else if t.HasFromGUID { // if exists (which is same, we checked above), they must match
|
|
if t.FromGUID != a.From.GUID {
|
|
return gen.fmt("resume token `fromguid` != expected: %v != %v", t.FromGUID, a.From.GUID)
|
|
}
|
|
} else {
|
|
_ = struct{}{} // both empty, ok
|
|
}
|
|
|
|
// To must never be empty
|
|
if !t.HasToGUID {
|
|
return gen.fmt("resume token does not have `toguid`")
|
|
}
|
|
if t.ToGUID != a.To.GUID { // a.To != nil because Validate checks for that
|
|
return gen.fmt("resume token `toguid` != expected: %v != %v", t.ToGUID, a.To.GUID)
|
|
}
|
|
|
|
// ensure resume stream will be encrypted/unencrypted as specified in policy
|
|
if err := valCtx.encEnabled.ValidateNoDefault(); err != nil {
|
|
panic(valCtx)
|
|
}
|
|
wouldSendEncryptedIfFilesystemIsEncrypted := t.RawOK
|
|
filesystemIsEncrypted := valCtx.encEnabled.B
|
|
resumeWillBeEncryptedSend := filesystemIsEncrypted && wouldSendEncryptedIfFilesystemIsEncrypted
|
|
if a.Encrypted.B {
|
|
if resumeWillBeEncryptedSend {
|
|
return nil // encrypted send in policy, and that's what's going to happen
|
|
} else {
|
|
if !filesystemIsEncrypted {
|
|
// NB: a.Encrypted.B && !valCtx.encEnabled.B
|
|
// is handled in the caller, because it doesn't concern the resume token (different kind of error)
|
|
panic("caller should have already raised an error")
|
|
}
|
|
// XXX we have no test coverage for this case. We'd need to forge a resume token for that.
|
|
return ZFSSendArgsResumeTokenMismatchEncryptionNotSet.fmt(
|
|
"resume token does not have rawok=true which would result in an unencrypted send, but policy mandates encrypted sends only")
|
|
}
|
|
} else {
|
|
if resumeWillBeEncryptedSend {
|
|
return ZFSSendArgsResumeTokenMismatchEncryptionSet.fmt(
|
|
"resume token has rawok=true which would result in encrypted send, but policy mandates unencrypted sends only")
|
|
} else {
|
|
return nil // unencrypted send in policy, and that's what's going to happen
|
|
}
|
|
}
|
|
}
|
|
|
|
var zfsSendStderrCaptureMaxSize = envconst.Int("ZREPL_ZFS_SEND_STDERR_MAX_CAPTURE_SIZE", 1<<15)
|
|
|
|
var ErrEncryptedSendNotSupported = fmt.Errorf("raw sends which are required for encrypted zfs send are not supported")
|
|
|
|
// if token != "", then send -t token is used
|
|
// otherwise send [-i from] to is used
|
|
// (if from is "" a full ZFS send is done)
|
|
//
|
|
// Returns ErrEncryptedSendNotSupported if encrypted send is requested but not supported by CLI
|
|
func ZFSSend(ctx context.Context, sendArgs ZFSSendArgsValidated) (*SendStream, error) {
|
|
|
|
args := make([]string, 0)
|
|
args = append(args, "send")
|
|
|
|
// pre-validation of sendArgs for plain ErrEncryptedSendNotSupported error
|
|
// we tie BackupProperties (send -b) and SendRaw (-w, same as with Encrypted) to this
|
|
// since these were released together.
|
|
if sendArgs.Encrypted.B || sendArgs.Raw || sendArgs.BackupProperties {
|
|
encryptionSupported, err := EncryptionCLISupported(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "cannot determine CLI native encryption support")
|
|
}
|
|
|
|
if !encryptionSupported {
|
|
return nil, ErrEncryptedSendNotSupported
|
|
}
|
|
}
|
|
|
|
sargs, err := sendArgs.buildSendCommandLine()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
args = append(args, sargs...)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
// setup stdout with an os.Pipe to control pipe buffer size
|
|
stdoutReader, stdoutWriter, err := pipeWithCapacityHint(getPipeCapacityHint("ZFS_SEND_PIPE_CAPACITY_HINT"))
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
stderrBuf := circlog.MustNewCircularLog(zfsSendStderrCaptureMaxSize)
|
|
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
|
|
cmd.SetStdio(zfscmd.Stdio{
|
|
Stdin: nil,
|
|
Stdout: stdoutWriter,
|
|
Stderr: stderrBuf,
|
|
})
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
cancel()
|
|
stdoutWriter.Close()
|
|
stdoutReader.Close()
|
|
return nil, errors.Wrap(err, "cannot start zfs send command")
|
|
}
|
|
// close our writing-end of the pipe so that we don't wait for ourselves when reading from the reading end
|
|
stdoutWriter.Close()
|
|
|
|
stream := &SendStream{
|
|
cmd: cmd,
|
|
stdoutReader: stdoutReader,
|
|
stderrBuf: stderrBuf,
|
|
}
|
|
_ = cancel // the SendStream.killAndWait() will kill the process
|
|
|
|
return stream, nil
|
|
}
|
|
|
|
type DrySendType string
|
|
|
|
const (
|
|
DrySendTypeFull DrySendType = "full"
|
|
DrySendTypeIncremental DrySendType = "incremental"
|
|
)
|
|
|
|
func DrySendTypeFromString(s string) (DrySendType, error) {
|
|
switch s {
|
|
case string(DrySendTypeFull):
|
|
return DrySendTypeFull, nil
|
|
case string(DrySendTypeIncremental):
|
|
return DrySendTypeIncremental, nil
|
|
default:
|
|
return "", fmt.Errorf("unknown dry send type %q", s)
|
|
}
|
|
}
|
|
|
|
type DrySendInfo struct {
|
|
Type DrySendType
|
|
Filesystem string // parsed from To field
|
|
From, To string // direct copy from ZFS output
|
|
SizeEstimate uint64 // 0 if size estimate is not possible
|
|
}
|
|
|
|
var (
|
|
// keep same number of capture groups for unmarshalInfoLine homogeneity
|
|
|
|
sendDryRunInfoLineRegexFull = regexp.MustCompile(`^(?P<type>full)\t()(?P<to>[^\t]+@[^\t]+)(\t(?P<size>[0-9]+))?$`)
|
|
// cannot enforce '[#@]' in incremental source, see test cases
|
|
sendDryRunInfoLineRegexIncremental = regexp.MustCompile(`^(?P<type>incremental)\t(?P<from>[^\t]+)\t(?P<to>[^\t]+@[^\t]+)(\t(?P<size>[0-9]+))?$`)
|
|
)
|
|
|
|
// see test cases for example output
|
|
func (s *DrySendInfo) unmarshalZFSOutput(output []byte) (err error) {
|
|
debug("DrySendInfo.unmarshalZFSOutput: output=%q", output)
|
|
lines := strings.Split(string(output), "\n")
|
|
for _, l := range lines {
|
|
regexMatched, err := s.unmarshalInfoLine(l)
|
|
if err != nil {
|
|
return fmt.Errorf("line %q: %s", l, err)
|
|
}
|
|
if !regexMatched {
|
|
continue
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("no match for info line (regex1 %s) (regex2 %s)", sendDryRunInfoLineRegexFull, sendDryRunInfoLineRegexIncremental)
|
|
}
|
|
|
|
// unmarshal info line, looks like this:
|
|
//
|
|
// full zroot/test/a@1 5389768
|
|
// incremental zroot/test/a@1 zroot/test/a@2 5383936
|
|
//
|
|
// => see test cases
|
|
func (s *DrySendInfo) unmarshalInfoLine(l string) (regexMatched bool, err error) {
|
|
|
|
mFull := sendDryRunInfoLineRegexFull.FindStringSubmatch(l)
|
|
mInc := sendDryRunInfoLineRegexIncremental.FindStringSubmatch(l)
|
|
var matchingExpr *regexp.Regexp
|
|
var m []string
|
|
if mFull == nil && mInc == nil {
|
|
return false, nil
|
|
} else if mFull != nil && mInc != nil {
|
|
panic(fmt.Sprintf("ambiguous ZFS dry send output: %q", l))
|
|
} else if mFull != nil {
|
|
matchingExpr, m = sendDryRunInfoLineRegexFull, mFull
|
|
} else if mInc != nil {
|
|
matchingExpr, m = sendDryRunInfoLineRegexIncremental, mInc
|
|
}
|
|
|
|
fields := make(map[string]string, matchingExpr.NumSubexp())
|
|
for i, name := range matchingExpr.SubexpNames() {
|
|
if i != 0 {
|
|
fields[name] = m[i]
|
|
}
|
|
}
|
|
|
|
s.Type, err = DrySendTypeFromString(fields["type"])
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
|
|
s.From = fields["from"]
|
|
s.To = fields["to"]
|
|
toFS, _, _, err := DecomposeVersionString(s.To)
|
|
if err != nil {
|
|
return true, fmt.Errorf("'to' is not a valid filesystem version: %s", err)
|
|
}
|
|
s.Filesystem = toFS
|
|
|
|
if fields["size"] == "" {
|
|
// workaround for OpenZFS 0.7 prior to https://github.com/openzfs/zfs/commit/835db58592d7d947e5818eb7281882e2a46073e0#diff-66bd524398bcd2ac70d90925ab6d8073L1245
|
|
// see https://github.com/zrepl/zrepl/issues/289
|
|
fields["size"] = "0"
|
|
}
|
|
s.SizeEstimate, err = strconv.ParseUint(fields["size"], 10, 64)
|
|
if err != nil {
|
|
return true, fmt.Errorf("cannot not parse size: %s", err)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// to may be "", in which case a full ZFS send is done
|
|
// May return BookmarkSizeEstimationNotSupported as err if from is a bookmark.
|
|
func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgsValidated) (_ *DrySendInfo, err error) {
|
|
|
|
if sendArgs.From != nil && strings.Contains(sendArgs.From.RelName, "#") {
|
|
/* TODO:
|
|
* XXX feature check & support this as well
|
|
* ZFS at the time of writing does not support dry-run send because size-estimation
|
|
* uses fromSnap's deadlist. However, for a bookmark, that deadlist no longer exists.
|
|
* Redacted send & recv will bring this functionality, see
|
|
* https://github.com/openzfs/openzfs/pull/484
|
|
*/
|
|
fromAbs, err := absVersion(sendArgs.FS, sendArgs.From)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error building abs version for 'from': %s", err)
|
|
}
|
|
toAbs, err := absVersion(sendArgs.FS, sendArgs.To)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error building abs version for 'to': %s", err)
|
|
}
|
|
return &DrySendInfo{
|
|
Type: DrySendTypeIncremental,
|
|
Filesystem: sendArgs.FS,
|
|
From: fromAbs,
|
|
To: toAbs,
|
|
SizeEstimate: 0}, nil
|
|
}
|
|
|
|
args := make([]string, 0)
|
|
args = append(args, "send", "-n", "-v", "-P")
|
|
sargs, err := sendArgs.buildSendCommandLine()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
args = append(args, sargs...)
|
|
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return nil, &ZFSError{output, err}
|
|
}
|
|
var si DrySendInfo
|
|
if err := si.unmarshalZFSOutput(output); err != nil {
|
|
return nil, fmt.Errorf("could not parse zfs send -n output: %s", err)
|
|
}
|
|
|
|
// There is a bug in OpenZFS where it estimates the size incorrectly.
|
|
// - zrepl: https://github.com/zrepl/zrepl/issues/463
|
|
// - resulting upstream bug: https://github.com/openzfs/zfs/issues/12265
|
|
//
|
|
// The wrong estimates are easy to detect because they are absurdly large.
|
|
// NB: we're doing the workaround for this late so that the test cases are not affected.
|
|
sizeEstimateThreshold := envconst.Uint64("ZREPL_ZFS_SEND_SIZE_ESTIMATE_INCORRECT_THRESHOLD", math.MaxInt64)
|
|
if sizeEstimateThreshold != 0 && si.SizeEstimate >= sizeEstimateThreshold {
|
|
debug("size estimate exceeds threshold %v, working around it: %#v %q", sizeEstimateThreshold, si, args)
|
|
si.SizeEstimate = 0
|
|
}
|
|
|
|
return &si, nil
|
|
}
|
|
|
|
type ErrRecvResumeNotSupported struct {
|
|
FS string
|
|
CheckErr error
|
|
}
|
|
|
|
func (e *ErrRecvResumeNotSupported) Error() string {
|
|
var buf strings.Builder
|
|
fmt.Fprintf(&buf, "zfs resumable recv into %q: ", e.FS)
|
|
if e.CheckErr != nil {
|
|
fmt.Fprint(&buf, e.CheckErr.Error())
|
|
} else {
|
|
fmt.Fprintf(&buf, "not supported by ZFS or pool")
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
type RecvOptions struct {
|
|
// Rollback to the oldest snapshot, destroy it, then perform `recv -F`.
|
|
// Note that this doesn't change property values, i.e. an existing local property value will be kept.
|
|
RollbackAndForceRecv bool
|
|
// Set -s flag used for resumable send & recv
|
|
SavePartialRecvState bool
|
|
|
|
InheritProperties []zfsprop.Property
|
|
OverrideProperties map[zfsprop.Property]string
|
|
}
|
|
|
|
func (opts RecvOptions) buildRecvFlags() []string {
|
|
args := make([]string, 0)
|
|
if opts.RollbackAndForceRecv {
|
|
args = append(args, "-F")
|
|
}
|
|
if opts.SavePartialRecvState {
|
|
args = append(args, "-s")
|
|
}
|
|
if opts.InheritProperties != nil {
|
|
for _, prop := range opts.InheritProperties {
|
|
args = append(args, "-x", string(prop))
|
|
}
|
|
}
|
|
if opts.OverrideProperties != nil {
|
|
for prop, value := range opts.OverrideProperties {
|
|
args = append(args, "-o", fmt.Sprintf("%s=%s", prop, value))
|
|
}
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
const RecvStderrBufSiz = 1 << 15
|
|
|
|
func ZFSRecv(ctx context.Context, fs string, v *ZFSSendArgVersion, stream io.ReadCloser, opts RecvOptions) (err error) {
|
|
|
|
if err := v.ValidateInMemory(fs); err != nil {
|
|
return errors.Wrap(err, "invalid version")
|
|
}
|
|
if !v.IsSnapshot() {
|
|
return errors.New("must receive into a snapshot")
|
|
}
|
|
|
|
fsdp, err := NewDatasetPath(fs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if opts.RollbackAndForceRecv {
|
|
// destroy all snapshots before `recv -F` because `recv -F`
|
|
// does not perform a rollback unless `send -R` was used (which we assume hasn't been the case)
|
|
snaps, err := ZFSListFilesystemVersions(ctx, fsdp, ListFilesystemVersionsOptions{
|
|
Types: Snapshots,
|
|
})
|
|
if _, ok := err.(*DatasetDoesNotExist); ok {
|
|
snaps = []FilesystemVersion{}
|
|
} else if err != nil {
|
|
return fmt.Errorf("cannot list versions for rollback for forced receive: %s", err)
|
|
}
|
|
sort.Slice(snaps, func(i, j int) bool {
|
|
return snaps[i].CreateTXG < snaps[j].CreateTXG
|
|
})
|
|
// bookmarks are rolled back automatically
|
|
if len(snaps) > 0 {
|
|
// use rollback to efficiently destroy all but the earliest snapshot
|
|
// then destroy that earliest snapshot
|
|
// afterwards, `recv -F` will work
|
|
rollbackTarget := snaps[0]
|
|
rollbackTargetAbs := rollbackTarget.ToAbsPath(fsdp)
|
|
debug("recv: rollback to %q", rollbackTargetAbs)
|
|
if err := ZFSRollback(ctx, fsdp, rollbackTarget, "-r"); err != nil {
|
|
return fmt.Errorf("cannot rollback %s to %s for forced receive: %s", fsdp.ToString(), rollbackTarget, err)
|
|
}
|
|
debug("recv: destroy %q", rollbackTargetAbs)
|
|
if err := ZFSDestroy(ctx, rollbackTargetAbs); err != nil {
|
|
return fmt.Errorf("cannot destroy %s for forced receive: %s", rollbackTargetAbs, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if opts.SavePartialRecvState {
|
|
if supported, err := ResumeRecvSupported(ctx, fsdp); err != nil || !supported {
|
|
return &ErrRecvResumeNotSupported{FS: fs, CheckErr: err}
|
|
}
|
|
}
|
|
|
|
args := []string{"recv"}
|
|
args = append(args, opts.buildRecvFlags()...)
|
|
args = append(args, v.FullPath(fs))
|
|
|
|
ctx, cancelCmd := context.WithCancel(ctx)
|
|
defer cancelCmd()
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
|
|
|
|
// TODO report bug upstream
|
|
// Setup an unused stdout buffer.
|
|
// Otherwise, ZoL v0.6.5.9-1 3.16.0-4-amd64 writes the following error to stderr and exits with code 1
|
|
// cannot receive new filesystem stream: invalid backup stream
|
|
stdout := bytes.NewBuffer(make([]byte, 0, 1024))
|
|
|
|
stderr := bytes.NewBuffer(make([]byte, 0, RecvStderrBufSiz))
|
|
|
|
stdin, stdinWriter, err := pipeWithCapacityHint(getPipeCapacityHint("ZFS_RECV_PIPE_CAPACITY_HINT"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cmd.SetStdio(zfscmd.Stdio{
|
|
Stdin: stdin,
|
|
Stdout: stdout,
|
|
Stderr: stderr,
|
|
})
|
|
|
|
if err = cmd.Start(); err != nil {
|
|
stdinWriter.Close()
|
|
stdin.Close()
|
|
return err
|
|
}
|
|
stdin.Close()
|
|
defer stdinWriter.Close()
|
|
|
|
pid := cmd.Process().Pid
|
|
debug := func(format string, args ...interface{}) {
|
|
debug("recv: pid=%v: %s", pid, fmt.Sprintf(format, args...))
|
|
}
|
|
|
|
debug("started")
|
|
|
|
copierErrChan := make(chan error)
|
|
go func() {
|
|
_, err := io.Copy(stdinWriter, stream)
|
|
copierErrChan <- err
|
|
stdinWriter.Close()
|
|
}()
|
|
waitErrChan := make(chan error)
|
|
go func() {
|
|
defer close(waitErrChan)
|
|
if err = cmd.Wait(); err != nil {
|
|
if rtErr := tryRecvErrorWithResumeToken(ctx, stderr.String()); rtErr != nil {
|
|
waitErrChan <- rtErr
|
|
} else if owErr := tryRecvDestroyOrOverwriteEncryptedErr(stderr.Bytes()); owErr != nil {
|
|
waitErrChan <- owErr
|
|
} else if readErr := tryRecvCannotReadFromStreamErr(stderr.Bytes()); readErr != nil {
|
|
waitErrChan <- readErr
|
|
} else {
|
|
waitErrChan <- &ZFSError{
|
|
Stderr: stderr.Bytes(),
|
|
WaitErr: err,
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}()
|
|
|
|
copierErr := <-copierErrChan
|
|
debug("copierErr: %T %s", copierErr, copierErr)
|
|
if copierErr != nil {
|
|
debug("killing zfs recv command after copierErr")
|
|
cancelCmd()
|
|
}
|
|
|
|
waitErr := <-waitErrChan
|
|
debug("waitErr: %T %s", waitErr, waitErr)
|
|
|
|
if copierErr == nil && waitErr == nil {
|
|
return nil
|
|
} else if _, isReadErr := waitErr.(*RecvCannotReadFromStreamErr); isReadErr {
|
|
return copierErr // likely network error reading from stream
|
|
} else {
|
|
return waitErr // almost always more interesting info. NOTE: do not wrap!
|
|
}
|
|
}
|
|
|
|
type RecvFailedWithResumeTokenErr struct {
|
|
Msg string
|
|
ResumeTokenRaw string
|
|
ResumeTokenParsed *ResumeToken
|
|
}
|
|
|
|
var recvErrorResumeTokenRE = regexp.MustCompile(`A resuming stream can be generated on the sending system by running:\s+zfs send -t\s(\S+)`)
|
|
|
|
func tryRecvErrorWithResumeToken(ctx context.Context, stderr string) *RecvFailedWithResumeTokenErr {
|
|
if match := recvErrorResumeTokenRE.FindStringSubmatch(stderr); match != nil {
|
|
parsed, err := ParseResumeToken(ctx, match[1])
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return &RecvFailedWithResumeTokenErr{
|
|
Msg: stderr,
|
|
ResumeTokenRaw: match[1],
|
|
ResumeTokenParsed: parsed,
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *RecvFailedWithResumeTokenErr) Error() string {
|
|
return fmt.Sprintf("receive failed, resume token available: %s\n%#v", e.ResumeTokenRaw, e.ResumeTokenParsed)
|
|
}
|
|
|
|
type RecvDestroyOrOverwriteEncryptedErr struct {
|
|
Msg string
|
|
}
|
|
|
|
func (e *RecvDestroyOrOverwriteEncryptedErr) Error() string {
|
|
return e.Msg
|
|
}
|
|
|
|
var recvDestroyOrOverwriteEncryptedErrRe = regexp.MustCompile(`^(cannot receive new filesystem stream: zfs receive -F cannot be used to destroy an encrypted filesystem or overwrite an unencrypted one with an encrypted one)`)
|
|
|
|
func tryRecvDestroyOrOverwriteEncryptedErr(stderr []byte) *RecvDestroyOrOverwriteEncryptedErr {
|
|
debug("tryRecvDestroyOrOverwriteEncryptedErr: %v", stderr)
|
|
m := recvDestroyOrOverwriteEncryptedErrRe.FindSubmatch(stderr)
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
return &RecvDestroyOrOverwriteEncryptedErr{Msg: string(m[1])}
|
|
}
|
|
|
|
type RecvCannotReadFromStreamErr struct {
|
|
Msg string
|
|
}
|
|
|
|
func (e *RecvCannotReadFromStreamErr) Error() string {
|
|
return e.Msg
|
|
}
|
|
|
|
var reRecvCannotReadFromStreamErr = regexp.MustCompile(`^(cannot receive: failed to read from stream)$`)
|
|
|
|
func tryRecvCannotReadFromStreamErr(stderr []byte) *RecvCannotReadFromStreamErr {
|
|
m := reRecvCannotReadFromStreamErr.FindSubmatch(stderr)
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
return &RecvCannotReadFromStreamErr{Msg: string(m[1])}
|
|
}
|
|
|
|
type ClearResumeTokenError struct {
|
|
ZFSOutput []byte
|
|
CmdError error
|
|
}
|
|
|
|
func (e ClearResumeTokenError) Error() string {
|
|
return fmt.Sprintf("could not clear resume token: %q", string(e.ZFSOutput))
|
|
}
|
|
|
|
// always returns *ClearResumeTokenError
|
|
func ZFSRecvClearResumeToken(ctx context.Context, fs string) (err error) {
|
|
if err := validateZFSFilesystem(fs); err != nil {
|
|
return err
|
|
}
|
|
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "recv", "-A", fs)
|
|
o, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
if bytes.Contains(o, []byte("does not have any resumable receive state to abort")) {
|
|
return nil
|
|
}
|
|
return &ClearResumeTokenError{o, err}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type PropertyValue struct {
|
|
Value string
|
|
Source PropertySource
|
|
}
|
|
|
|
type ZFSProperties struct {
|
|
m map[string]PropertyValue
|
|
}
|
|
|
|
func NewZFSProperties() *ZFSProperties {
|
|
return &ZFSProperties{make(map[string]PropertyValue, 4)}
|
|
}
|
|
|
|
func (p *ZFSProperties) Get(key string) string {
|
|
return p.m[key].Value
|
|
}
|
|
|
|
func (p *ZFSProperties) GetDetails(key string) PropertyValue {
|
|
return p.m[key]
|
|
}
|
|
|
|
func zfsSet(ctx context.Context, path string, props map[string]string) error {
|
|
args := make([]string, 0)
|
|
args = append(args, "set")
|
|
|
|
for prop, val := range props {
|
|
if strings.Contains(prop, "=") {
|
|
return errors.New("prop contains rune '=' which is the delimiter between property name and value")
|
|
}
|
|
args = append(args, fmt.Sprintf("%s=%s", prop, val))
|
|
}
|
|
|
|
args = append(args, path)
|
|
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
|
|
stdio, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
err = &ZFSError{
|
|
Stderr: stdio,
|
|
WaitErr: err,
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func ZFSSet(ctx context.Context, fs *DatasetPath, props map[string]string) error {
|
|
return zfsSet(ctx, fs.ToString(), props)
|
|
}
|
|
|
|
func ZFSGet(ctx context.Context, fs *DatasetPath, props []string) (*ZFSProperties, error) {
|
|
return zfsGet(ctx, fs.ToString(), props, SourceAny)
|
|
}
|
|
|
|
// The returned error includes requested filesystem and version as quoted strings in its error message
|
|
func ZFSGetGUID(ctx context.Context, fs string, version string) (g uint64, err error) {
|
|
defer func(e *error) {
|
|
if *e != nil {
|
|
*e = fmt.Errorf("zfs get guid fs=%q version=%q: %s", fs, version, *e)
|
|
}
|
|
}(&err)
|
|
if err := validateZFSFilesystem(fs); err != nil {
|
|
return 0, err
|
|
}
|
|
if len(version) == 0 {
|
|
return 0, errors.New("version must have non-zero length")
|
|
}
|
|
if strings.IndexAny(version[0:1], "@#") != 0 {
|
|
return 0, errors.New("version does not start with @ or #")
|
|
}
|
|
path := fmt.Sprintf("%s%s", fs, version)
|
|
props, err := zfsGet(ctx, path, []string{"guid"}, SourceAny) // always local
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return strconv.ParseUint(props.Get("guid"), 10, 64)
|
|
}
|
|
|
|
type GetMountpointOutput struct {
|
|
Mounted bool
|
|
Mountpoint string
|
|
}
|
|
|
|
func ZFSGetMountpoint(ctx context.Context, fs string) (*GetMountpointOutput, error) {
|
|
if err := EntityNamecheck(fs, EntityTypeFilesystem); err != nil {
|
|
return nil, err
|
|
}
|
|
props, err := zfsGet(ctx, fs, []string{"mountpoint", "mounted"}, SourceAny)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
o := &GetMountpointOutput{}
|
|
o.Mounted = props.Get("mounted") == "yes"
|
|
o.Mountpoint = props.Get("mountpoint")
|
|
if o.Mountpoint == "none" {
|
|
o.Mountpoint = ""
|
|
}
|
|
if o.Mounted && o.Mountpoint == "" {
|
|
panic("unexpected zfs get output")
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
func ZFSGetRawAnySource(ctx context.Context, path string, props []string) (*ZFSProperties, error) {
|
|
return zfsGet(ctx, path, props, SourceAny)
|
|
}
|
|
|
|
var zfsGetDatasetDoesNotExistRegexp = regexp.MustCompile(`^cannot open '([^)]+)': (dataset does not exist|no such pool or dataset)`) // verified in platformtest
|
|
|
|
type DatasetDoesNotExist struct {
|
|
Path string
|
|
}
|
|
|
|
func (d *DatasetDoesNotExist) Error() string { return fmt.Sprintf("dataset %q does not exist", d.Path) }
|
|
|
|
func tryDatasetDoesNotExist(expectPath string, stderr []byte) *DatasetDoesNotExist {
|
|
if sm := zfsGetDatasetDoesNotExistRegexp.FindSubmatch(stderr); sm != nil {
|
|
if string(sm[1]) == expectPath {
|
|
return &DatasetDoesNotExist{expectPath}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
//go:generate enumer -type=PropertySource -trimprefix=Source
|
|
type PropertySource uint32
|
|
|
|
const (
|
|
SourceLocal PropertySource = 1 << iota
|
|
SourceDefault
|
|
SourceInherited
|
|
SourceNone
|
|
SourceTemporary
|
|
SourceReceived
|
|
|
|
SourceAny PropertySource = ^PropertySource(0)
|
|
)
|
|
|
|
var propertySourceParseLUT = map[string]PropertySource{
|
|
"local": SourceLocal,
|
|
"default": SourceDefault,
|
|
"inherited": SourceInherited,
|
|
"-": SourceNone,
|
|
"temporary": SourceTemporary,
|
|
"received": SourceReceived,
|
|
}
|
|
|
|
func parsePropertySource(s string) (PropertySource, error) {
|
|
fields := strings.Fields(s)
|
|
if len(fields) > 0 {
|
|
v, ok := propertySourceParseLUT[fields[0]]
|
|
if ok {
|
|
return v, nil
|
|
}
|
|
// fallthrough
|
|
}
|
|
return 0, fmt.Errorf("unknown property source %q", s)
|
|
}
|
|
|
|
func (s PropertySource) zfsGetSourceFieldPrefixes() []string {
|
|
prefixes := make([]string, 0, 7)
|
|
if s&SourceLocal != 0 {
|
|
prefixes = append(prefixes, "local")
|
|
}
|
|
if s&SourceDefault != 0 {
|
|
prefixes = append(prefixes, "default")
|
|
}
|
|
if s&SourceInherited != 0 {
|
|
prefixes = append(prefixes, "inherited")
|
|
}
|
|
if s&SourceNone != 0 {
|
|
prefixes = append(prefixes, "-")
|
|
}
|
|
if s&SourceTemporary != 0 {
|
|
prefixes = append(prefixes, "temporary")
|
|
}
|
|
if s&SourceReceived != 0 {
|
|
prefixes = append(prefixes, "received")
|
|
}
|
|
if s == SourceAny {
|
|
prefixes = append(prefixes, "")
|
|
}
|
|
return prefixes
|
|
}
|
|
|
|
func zfsGetRecursive(ctx context.Context, path string, depth int, dstypes []string, props []string, allowedSources PropertySource) (map[string]*ZFSProperties, error) {
|
|
args := []string{"get", "-Hp", "-o", "name,property,value,source"}
|
|
if depth != 0 {
|
|
args = append(args, "-r")
|
|
if depth != -1 {
|
|
args = append(args, "-d", fmt.Sprintf("%d", depth))
|
|
}
|
|
}
|
|
if len(dstypes) > 0 {
|
|
args = append(args, "-t", strings.Join(dstypes, ","))
|
|
}
|
|
args = append(args, strings.Join(props, ","), path)
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
|
|
stdout, err := cmd.Output()
|
|
if err != nil {
|
|
if exitErr, ok := err.(*exec.ExitError); ok {
|
|
if exitErr.Exited() {
|
|
// screen-scrape output
|
|
if ddne := tryDatasetDoesNotExist(path, exitErr.Stderr); ddne != nil {
|
|
return nil, ddne
|
|
}
|
|
}
|
|
return nil, &ZFSError{
|
|
Stderr: exitErr.Stderr,
|
|
WaitErr: exitErr,
|
|
}
|
|
}
|
|
return nil, err
|
|
}
|
|
o := string(stdout)
|
|
lines := strings.Split(o, "\n")
|
|
propsByFS := make(map[string]*ZFSProperties)
|
|
allowedPrefixes := allowedSources.zfsGetSourceFieldPrefixes()
|
|
for _, line := range lines[:len(lines)-1] { // last line is an empty line due to how strings.Split works
|
|
fields := strings.FieldsFunc(line, func(r rune) bool {
|
|
return r == '\t'
|
|
})
|
|
if len(fields) != 4 {
|
|
return nil, fmt.Errorf("zfs get did not return name,property,value,source tuples")
|
|
}
|
|
for _, p := range allowedPrefixes {
|
|
// prefix-match so that SourceAny (= "") works
|
|
if strings.HasPrefix(fields[3], p) {
|
|
source, err := parsePropertySource(fields[3])
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "parse property source")
|
|
}
|
|
fsProps, ok := propsByFS[fields[0]]
|
|
if !ok {
|
|
fsProps = &ZFSProperties{
|
|
make(map[string]PropertyValue),
|
|
}
|
|
}
|
|
if _, ok := fsProps.m[fields[1]]; ok {
|
|
return nil, errors.Errorf("duplicate property %q for dataset %q", fields[1], fields[0])
|
|
}
|
|
fsProps.m[fields[1]] = PropertyValue{
|
|
Value: fields[2],
|
|
Source: source,
|
|
}
|
|
propsByFS[fields[0]] = fsProps
|
|
break
|
|
}
|
|
}
|
|
}
|
|
// validate we got expected output
|
|
for fs, fsProps := range propsByFS {
|
|
if len(fsProps.m) != len(props) {
|
|
return nil, errors.Errorf("zfs get did not return all requested values for dataset %q\noutput was:\n%s", fs, o)
|
|
}
|
|
}
|
|
return propsByFS, nil
|
|
}
|
|
|
|
func zfsGet(ctx context.Context, path string, props []string, allowedSources PropertySource) (*ZFSProperties, error) {
|
|
propMap, err := zfsGetRecursive(ctx, path, 0, nil, props, allowedSources)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(propMap) == 0 {
|
|
// XXX callers expect to always get a result here
|
|
// They will observe props.Get("propname") == ""
|
|
// We should change .Get to return a tuple, or an error, or whatever.
|
|
return &ZFSProperties{make(map[string]PropertyValue)}, nil
|
|
}
|
|
if len(propMap) != 1 {
|
|
return nil, errors.Errorf("zfs get unexpectedly returned properties for multiple datasets")
|
|
}
|
|
res, ok := propMap[path]
|
|
if !ok {
|
|
return nil, errors.Errorf("zfs get returned properties for a different dataset that requested")
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
type DestroySnapshotsError struct {
|
|
RawLines []string
|
|
Filesystem string
|
|
Undestroyable []string // snapshot name only (filesystem@ stripped)
|
|
Reason []string
|
|
}
|
|
|
|
func (e *DestroySnapshotsError) Error() string {
|
|
if len(e.Undestroyable) != len(e.Reason) {
|
|
panic(fmt.Sprintf("%v != %v", len(e.Undestroyable), len(e.Reason)))
|
|
}
|
|
if len(e.Undestroyable) == 0 {
|
|
panic(fmt.Sprintf("error must have one undestroyable snapshot, %q", e.Filesystem))
|
|
}
|
|
if len(e.Undestroyable) == 1 {
|
|
return fmt.Sprintf("zfs destroy failed: %s@%s: %s", e.Filesystem, e.Undestroyable[0], e.Reason[0])
|
|
}
|
|
return strings.Join(e.RawLines, "\n")
|
|
}
|
|
|
|
var destroySnapshotsErrorRegexp = regexp.MustCompile(`^cannot destroy snapshot ([^@]+)@(.+): (.*)$`) // yes, datasets can contain `:`
|
|
|
|
var destroyOneOrMoreSnapshotsNoneExistedErrorRegexp = regexp.MustCompile(`^could not find any snapshots to destroy; check snapshot names.`)
|
|
|
|
var destroyBookmarkDoesNotExist = regexp.MustCompile(`^bookmark '([^']+)' does not exist`)
|
|
|
|
func tryParseDestroySnapshotsError(arg string, stderr []byte) *DestroySnapshotsError {
|
|
|
|
argComps := strings.SplitN(arg, "@", 2)
|
|
if len(argComps) != 2 {
|
|
return nil
|
|
}
|
|
filesystem := argComps[0]
|
|
|
|
lines := bufio.NewScanner(bytes.NewReader(stderr))
|
|
undestroyable := []string{}
|
|
reason := []string{}
|
|
rawLines := []string{}
|
|
for lines.Scan() {
|
|
line := lines.Text()
|
|
rawLines = append(rawLines, line)
|
|
m := destroySnapshotsErrorRegexp.FindStringSubmatch(line)
|
|
if m == nil {
|
|
return nil // unexpected line => be conservative
|
|
} else {
|
|
if m[1] != filesystem {
|
|
return nil // unexpected line => be conservative
|
|
}
|
|
undestroyable = append(undestroyable, m[2])
|
|
reason = append(reason, m[3])
|
|
}
|
|
}
|
|
if len(undestroyable) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return &DestroySnapshotsError{
|
|
RawLines: rawLines,
|
|
Filesystem: filesystem,
|
|
Undestroyable: undestroyable,
|
|
Reason: reason,
|
|
}
|
|
}
|
|
|
|
func ZFSDestroy(ctx context.Context, arg string) (err error) {
|
|
|
|
var dstype, filesystem string
|
|
idx := strings.IndexAny(arg, "@#")
|
|
if idx == -1 {
|
|
dstype = "filesystem"
|
|
filesystem = arg
|
|
} else {
|
|
switch arg[idx] {
|
|
case '@':
|
|
dstype = "snapshot"
|
|
case '#':
|
|
dstype = "bookmark"
|
|
}
|
|
filesystem = arg[:idx]
|
|
}
|
|
|
|
defer prometheus.NewTimer(prom.ZFSDestroyDuration.WithLabelValues(dstype, filesystem))
|
|
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "destroy", arg)
|
|
stdio, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
err = &ZFSError{
|
|
Stderr: stdio,
|
|
WaitErr: err,
|
|
}
|
|
|
|
if destroyOneOrMoreSnapshotsNoneExistedErrorRegexp.Match(stdio) {
|
|
err = &DatasetDoesNotExist{arg}
|
|
} else if match := destroyBookmarkDoesNotExist.FindStringSubmatch(string(stdio)); match != nil && match[1] == arg {
|
|
err = &DatasetDoesNotExist{arg}
|
|
} else if dsNotExistErr := tryDatasetDoesNotExist(filesystem, stdio); dsNotExistErr != nil {
|
|
err = dsNotExistErr
|
|
} else if dserr := tryParseDestroySnapshotsError(arg, stdio); dserr != nil {
|
|
err = dserr
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func ZFSDestroyIdempotent(ctx context.Context, path string) error {
|
|
err := ZFSDestroy(ctx, path)
|
|
if _, ok := err.(*DatasetDoesNotExist); ok {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func ZFSSnapshot(ctx context.Context, fs *DatasetPath, name string, recursive bool) (err error) {
|
|
|
|
promTimer := prometheus.NewTimer(prom.ZFSSnapshotDuration.WithLabelValues(fs.ToString()))
|
|
defer promTimer.ObserveDuration()
|
|
|
|
snapname := fmt.Sprintf("%s@%s", fs.ToString(), name)
|
|
if err := EntityNamecheck(snapname, EntityTypeSnapshot); err != nil {
|
|
return errors.Wrap(err, "zfs snapshot")
|
|
}
|
|
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "snapshot", snapname)
|
|
stdio, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
err = &ZFSError{
|
|
Stderr: stdio,
|
|
WaitErr: err,
|
|
}
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
var zfsBookmarkExistsRegex = regexp.MustCompile("^cannot create bookmark '[^']+': bookmark exists")
|
|
|
|
type BookmarkExists struct {
|
|
zfsMsg string
|
|
fs, bookmark string
|
|
bookmarkOrigin ZFSSendArgVersion
|
|
bookGuid uint64
|
|
}
|
|
|
|
func (e *BookmarkExists) Error() string {
|
|
return fmt.Sprintf("bookmark %s (guid=%v) with #%s: bookmark #%s exists but has different guid (%v)",
|
|
e.bookmarkOrigin.FullPath(e.fs), e.bookmarkOrigin.GUID, e.bookmark, e.bookmark, e.bookGuid,
|
|
)
|
|
}
|
|
|
|
var ErrBookmarkCloningNotSupported = fmt.Errorf("bookmark cloning feature is not yet supported by ZFS")
|
|
|
|
// idempotently create bookmark of the given version v
|
|
//
|
|
// if `v` is a bookmark, returns ErrBookmarkCloningNotSupported
|
|
// unless a bookmark with the name `bookmark` exists and has the same idenitty (zfs.FilesystemVersionEqualIdentity)
|
|
//
|
|
// v must be validated by the caller
|
|
func ZFSBookmark(ctx context.Context, fs string, v FilesystemVersion, bookmark string) (bm FilesystemVersion, err error) {
|
|
|
|
bm = FilesystemVersion{
|
|
Type: Bookmark,
|
|
Name: bookmark,
|
|
UserRefs: OptionUint64{Valid: false},
|
|
// bookmarks have the same createtxg, guid and creation as their origin
|
|
CreateTXG: v.CreateTXG,
|
|
Guid: v.Guid,
|
|
Creation: v.Creation,
|
|
}
|
|
|
|
promTimer := prometheus.NewTimer(prom.ZFSBookmarkDuration.WithLabelValues(fs))
|
|
defer promTimer.ObserveDuration()
|
|
|
|
bookmarkname := fmt.Sprintf("%s#%s", fs, bookmark)
|
|
if err := EntityNamecheck(bookmarkname, EntityTypeBookmark); err != nil {
|
|
return bm, err
|
|
}
|
|
|
|
if v.IsBookmark() {
|
|
existingBm, err := ZFSGetFilesystemVersion(ctx, bookmarkname)
|
|
if _, ok := err.(*DatasetDoesNotExist); ok {
|
|
return bm, ErrBookmarkCloningNotSupported
|
|
} else if err != nil {
|
|
return bm, errors.Wrap(err, "bookmark: idempotency check for bookmark cloning")
|
|
}
|
|
if FilesystemVersionEqualIdentity(bm, existingBm) {
|
|
return existingBm, nil
|
|
}
|
|
return bm, ErrBookmarkCloningNotSupported // TODO This is work in progress: https://github.com/zfsonlinux/zfs/pull/9571
|
|
}
|
|
|
|
snapname := v.FullPath(fs)
|
|
if err := EntityNamecheck(snapname, EntityTypeSnapshot); err != nil {
|
|
return bm, err
|
|
}
|
|
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, "bookmark", snapname, bookmarkname)
|
|
stdio, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
if ddne := tryDatasetDoesNotExist(snapname, stdio); ddne != nil {
|
|
return bm, ddne
|
|
} else if zfsBookmarkExistsRegex.Match(stdio) {
|
|
|
|
// check if this was idempotent
|
|
bookGuid, err := ZFSGetGUID(ctx, fs, "#"+bookmark)
|
|
if err != nil {
|
|
return bm, errors.Wrap(err, "bookmark: idempotency check for bookmark creation") // guid error expressive enough
|
|
}
|
|
|
|
if v.Guid == bookGuid {
|
|
debug("bookmark: %q %q was idempotent: {snap,book}guid %d == %d", snapname, bookmarkname, v.Guid, bookGuid)
|
|
return bm, nil
|
|
}
|
|
return bm, &BookmarkExists{
|
|
fs: fs, bookmarkOrigin: v.ToSendArgVersion(), bookmark: bookmark,
|
|
zfsMsg: string(stdio),
|
|
bookGuid: bookGuid,
|
|
}
|
|
|
|
} else {
|
|
return bm, &ZFSError{
|
|
Stderr: stdio,
|
|
WaitErr: err,
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
return bm, nil
|
|
}
|
|
|
|
func ZFSRollback(ctx context.Context, fs *DatasetPath, snapshot FilesystemVersion, rollbackArgs ...string) (err error) {
|
|
|
|
snapabs := snapshot.ToAbsPath(fs)
|
|
if snapshot.Type != Snapshot {
|
|
return fmt.Errorf("can only rollback to snapshots, got %s", snapabs)
|
|
}
|
|
|
|
args := []string{"rollback"}
|
|
args = append(args, rollbackArgs...)
|
|
args = append(args, snapabs)
|
|
|
|
cmd := zfscmd.CommandContext(ctx, ZFS_BINARY, args...)
|
|
stdio, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
err = &ZFSError{
|
|
Stderr: stdio,
|
|
WaitErr: err,
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|