endpoint: refactor, fix stale holds on initial replication, holds release subcmds

- endpoint abstractions now share an interface `Abstraction`
- pkg endpoint now has a query facitilty (`ListAbstractions`) which is
  used to find on-disk
    - step holds and bookmarks
    - replication cursors (v1, v2)
    - last-received-holds
- the `zrepl holds list` command consumes endpoint.ListAbstractions
- the new `zrepl holds release-{all,stale}` commands can be used
  to remove abstractions of package endpoint

Co-authored-by: InsanePrawn <insane.prawny@gmail.com>

supersedes #282
fixes #280
fixes #278
This commit is contained in:
Christian Schwarz 2020-03-26 23:43:17 +01:00
parent 44bd354eae
commit f3734ed0d4
28 changed files with 1854 additions and 1168 deletions

View File

@ -1,13 +1,11 @@
package client
import (
"context"
"encoding/json"
"fmt"
"os"
"sort"
"strings"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/daemon/filters"
@ -20,68 +18,126 @@ var (
Use: "holds",
Short: "manage holds & step bookmarks",
SetupSubcommands: func() []*cli.Subcommand {
return holdsList
return []*cli.Subcommand{
holdsCmdList,
holdsCmdReleaseAll,
holdsCmdReleaseStale,
}
},
}
)
var holdsList = []*cli.Subcommand{
&cli.Subcommand{
Use: "list [FSFILTER]",
Run: doHoldsList,
NoRequireConfig: true,
Short: `
FSFILTER SYNTAX:
representation of a 'filesystems' filter statement on the command line
`,
},
// a common set of CLI flags that map to the fields of an
// endpoint.ListZFSHoldsAndBookmarksQuery
type holdsFilterFlags struct {
Filesystems FilesystemsFilterFlag
Job JobIDFlag
Types AbstractionTypesFlag
}
func fsfilterFromCliArg(arg string) (zfs.DatasetFilter, error) {
mappings := strings.Split(arg, ",")
// produce a query from the CLI flags
func (f holdsFilterFlags) Query() (endpoint.ListZFSHoldsAndBookmarksQuery, error) {
q := endpoint.ListZFSHoldsAndBookmarksQuery{
FS: f.Filesystems.FlagValue(),
What: f.Types.FlagValue(),
JobID: f.Job.FlagValue(),
Until: nil, // TODO support this as a flag
}
return q, q.Validate()
}
func (f *holdsFilterFlags) registerHoldsFilterFlags(s *pflag.FlagSet, verb string) {
// Note: the default value is defined in the .FlagValue methods
s.Var(&f.Filesystems, "fs", fmt.Sprintf("only %s holds on the specified filesystem [default: all filesystems] [comma-separated list of <dataset-pattern>:<ok|!> pairs]", verb))
s.Var(&f.Job, "job", fmt.Sprintf("only %s holds created by the specified job [default: any job]", verb))
variants := make([]string, 0, len(endpoint.AbstractionTypesAll))
for v := range endpoint.AbstractionTypesAll {
variants = append(variants, string(v))
}
variants = sort.StringSlice(variants)
variantsJoined := strings.Join(variants, "|")
s.Var(&f.Types, "type", fmt.Sprintf("only %s holds of the specified type [default: all] [comma-separated list of %s]", verb, variantsJoined))
}
type JobIDFlag struct{ J *endpoint.JobID }
func (f *JobIDFlag) Set(s string) error {
if len(s) == 0 {
*f = JobIDFlag{J: nil}
return nil
}
jobID, err := endpoint.MakeJobID(s)
if err != nil {
return err
}
*f = JobIDFlag{J: &jobID}
return nil
}
func (f JobIDFlag) Type() string { return "job-ID" }
func (f JobIDFlag) String() string { return fmt.Sprint(f.J) }
func (f JobIDFlag) FlagValue() *endpoint.JobID { return f.J }
type AbstractionTypesFlag map[endpoint.AbstractionType]bool
func (f *AbstractionTypesFlag) Set(s string) error {
ats, err := endpoint.AbstractionTypeSetFromStrings(strings.Split(s, ","))
if err != nil {
return err
}
*f = AbstractionTypesFlag(ats)
return nil
}
func (f AbstractionTypesFlag) Type() string { return "abstraction-type" }
func (f AbstractionTypesFlag) String() string {
return endpoint.AbstractionTypeSet(f).String()
}
func (f AbstractionTypesFlag) FlagValue() map[endpoint.AbstractionType]bool {
if len(f) > 0 {
return f
}
return endpoint.AbstractionTypesAll
}
type FilesystemsFilterFlag struct {
F endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter
}
func (flag *FilesystemsFilterFlag) Set(s string) error {
mappings := strings.Split(s, ",")
if len(mappings) == 1 {
flag.F = endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &mappings[0],
}
return nil
}
f := filters.NewDatasetMapFilter(len(mappings), true)
for _, m := range mappings {
thisMappingErr := fmt.Errorf("expecting comma-separated list of <dataset-pattern>:<ok|!> pairs, got %q", m)
lhsrhs := strings.SplitN(m, ":", 2)
if len(lhsrhs) != 2 {
return nil, thisMappingErr
return thisMappingErr
}
err := f.Add(lhsrhs[0], lhsrhs[1])
if err != nil {
return nil, fmt.Errorf("%s: %s", thisMappingErr, err)
return fmt.Errorf("%s: %s", thisMappingErr, err)
}
}
return f.AsFilter(), nil
}
func doHoldsList(sc *cli.Subcommand, args []string) error {
var err error
ctx := context.Background()
if len(args) > 1 {
return errors.New("this subcommand takes at most one argument")
flag.F = endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{
Filter: f,
}
var filter zfs.DatasetFilter
if len(args) == 0 {
filter = zfs.NoFilter()
} else {
filter, err = fsfilterFromCliArg(args[0])
if err != nil {
return errors.Wrap(err, "cannot parse filesystem filter args")
}
}
listing, err := endpoint.ListZFSHoldsAndBookmarks(ctx, filter)
if err != nil {
return err // context clear by invocation of command
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent(" ", " ")
if err := enc.Encode(listing); err != nil {
panic(err)
}
return nil
}
func (flag FilesystemsFilterFlag) Type() string { return "filesystem filter spec" }
func (flag FilesystemsFilterFlag) String() string {
return fmt.Sprintf("%v", flag.F)
}
func (flag FilesystemsFilterFlag) FlagValue() endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter {
var z FilesystemsFilterFlag
if flag == z {
return endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{Filter: zfs.NoFilter()}
}
return flag.F
}

71
client/holds_list.go Normal file
View File

@ -0,0 +1,71 @@
package client
import (
"context"
"encoding/json"
"fmt"
"os"
"github.com/fatih/color"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/endpoint"
)
var holdsListFlags struct {
Filter holdsFilterFlags
Json bool
}
var holdsCmdList = &cli.Subcommand{
Use: "list",
Run: doHoldsList,
NoRequireConfig: true,
Short: "list holds and bookmarks",
SetupFlags: func(f *pflag.FlagSet) {
holdsListFlags.Filter.registerHoldsFilterFlags(f, "list")
f.BoolVar(&holdsListFlags.Json, "json", false, "emit JSON")
},
}
func doHoldsList(sc *cli.Subcommand, args []string) error {
var err error
ctx := context.Background()
if len(args) > 0 {
return errors.New("this subcommand takes no positional arguments")
}
q, err := holdsListFlags.Filter.Query()
if err != nil {
return errors.Wrap(err, "invalid filter specification on command line")
}
abstractions, errors, err := endpoint.ListAbstractions(ctx, q)
if err != nil {
return err // context clear by invocation of command
}
// always print what we got
if holdsListFlags.Json {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(abstractions); err != nil {
panic(err)
}
fmt.Println()
} else {
for _, a := range abstractions {
fmt.Println(a)
}
}
// then potential errors, so that users always see them
if len(errors) > 0 {
color.New(color.FgRed).Fprintf(os.Stderr, "there were errors in listing the abstractions:\n%s\n", errors)
return fmt.Errorf("")
} else {
return nil
}
}

144
client/holds_release.go Normal file
View File

@ -0,0 +1,144 @@
package client
import (
"context"
"encoding/json"
"fmt"
"os"
"github.com/fatih/color"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/zrepl/zrepl/cli"
"github.com/zrepl/zrepl/endpoint"
)
// shared between release-all and release-step
var holdsReleaseFlags struct {
Filter holdsFilterFlags
Json bool
DryRun bool
}
func registerHoldsReleaseFlags(s *pflag.FlagSet) {
holdsReleaseFlags.Filter.registerHoldsFilterFlags(s, "release")
s.BoolVar(&holdsReleaseFlags.Json, "json", false, "emit json instead of pretty-printed")
s.BoolVar(&holdsReleaseFlags.DryRun, "dry-run", false, "do a dry-run")
}
var holdsCmdReleaseAll = &cli.Subcommand{
Use: "release-all",
Run: doHoldsReleaseAll,
NoRequireConfig: true,
Short: `(DANGEROUS) release all zrepl-managed holds and bookmarks, mostly useful for uninstalling zrepl`,
SetupFlags: registerHoldsReleaseFlags,
}
var holdsCmdReleaseStale = &cli.Subcommand{
Use: "release-stale",
Run: doHoldsReleaseStale,
NoRequireConfig: true,
Short: `release stale zrepl-managed holds and boomkarks (useful if zrepl has a bug and doesn't do it by itself)`,
SetupFlags: registerHoldsReleaseFlags,
}
func doHoldsReleaseAll(sc *cli.Subcommand, args []string) error {
var err error
ctx := context.Background()
if len(args) > 0 {
return errors.New("this subcommand takes no positional arguments")
}
q, err := holdsReleaseFlags.Filter.Query()
if err != nil {
return errors.Wrap(err, "invalid filter specification on command line")
}
abstractions, listErrors, err := endpoint.ListAbstractions(ctx, q)
if err != nil {
return err // context clear by invocation of command
}
if len(listErrors) > 0 {
color.New(color.FgRed).Fprintf(os.Stderr, "there were errors in listing the abstractions:\n%s\n", listErrors)
// proceed anyways with rest of abstractions
}
return doHoldsRelease_Common(ctx, abstractions)
}
func doHoldsReleaseStale(sc *cli.Subcommand, args []string) error {
var err error
ctx := context.Background()
if len(args) > 0 {
return errors.New("this subcommand takes no positional arguments")
}
q, err := holdsReleaseFlags.Filter.Query()
if err != nil {
return errors.Wrap(err, "invalid filter specification on command line")
}
stalenessInfo, err := endpoint.ListStale(ctx, q)
if err != nil {
return err // context clear by invocation of command
}
return doHoldsRelease_Common(ctx, stalenessInfo.Stale)
}
func doHoldsRelease_Common(ctx context.Context, destroy []endpoint.Abstraction) error {
if holdsReleaseFlags.DryRun {
if holdsReleaseFlags.Json {
m, err := json.MarshalIndent(destroy, "", " ")
if err != nil {
panic(err)
}
if _, err := os.Stdout.Write(m); err != nil {
panic(err)
}
fmt.Println()
} else {
for _, a := range destroy {
fmt.Printf("would destroy %s\n", a)
}
}
return nil
}
outcome := endpoint.BatchDestroy(ctx, destroy)
hadErr := false
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
colorErr := color.New(color.FgRed)
printfSuccess := color.New(color.FgGreen).FprintfFunc()
printfSection := color.New(color.Bold).FprintfFunc()
for res := range outcome {
hadErr = hadErr || res.DestroyErr != nil
if holdsReleaseFlags.Json {
err := enc.Encode(res)
if err != nil {
colorErr.Fprintf(os.Stderr, "cannot marshal there were errors in destroying the abstractions")
}
} else {
printfSection(os.Stdout, "destroy %s ...", res.Abstraction)
if res.DestroyErr != nil {
colorErr.Fprintf(os.Stdout, " failed:\n%s\n", res.DestroyErr)
} else {
printfSuccess(os.Stdout, " OK\n")
}
}
}
if hadErr {
colorErr.Add(color.Bold).Fprintf(os.Stderr, "there were errors in destroying the abstractions")
return fmt.Errorf("")
} else {
return nil
}
}

View File

@ -120,7 +120,7 @@ The following steps take place during replication and can be monitored using the
* Perform replication steps in the following order:
Among all filesystems with pending replication steps, pick the filesystem whose next replication step's snapshot is the oldest.
* Create placeholder filesystems on the receiving side to mirror the dataset paths on the sender to ``root_fs/${client_identity}``.
* Acquire send-side step-holds on the step's `from` and `to` snapshots.
* Acquire send-side *step-holds* on the step's `from` and `to` snapshots.
* Perform the replication step.
* Move the **replication cursor** bookmark on the sending side (see below).
* Move the **last-received-hold** on the receiving side (see below).

View File

@ -38,6 +38,8 @@ CLI Overview
* - ``zrepl migrate``
- | perform on-disk state / ZFS property migrations
| (see :ref:`changelog <changelog>` for details)
* - ``zrepl holds``
- list and remove holds and step bookmarks created by zrepl (see :ref:`overview <replication-cursor-and-last-received-hold>` )
.. _usage-zrepl-daemon:

View File

@ -113,25 +113,40 @@ func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMo
var err error
fs := r.GetFilesystem()
mostRecent, err := sendArgsFromPDUAndValidateExists(ctx, fs, r.GetSenderVersion())
log := getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", r.GetSenderVersion()))
// FIXME check if fs exists AND IS PERMITTED by p.filesystems
log.WithField("full_hint", r).Debug("full hint")
if r.GetSenderVersion() == nil {
// no common ancestor found, likely due to failed prior replication attempt
// => release stale step holds to prevent them from accumulating
// (they can accumulate on initial replication because each inital replication step might hold a different `to`)
// => replication cursors cannot accumulate because we always _move_ the replication cursor
log.Debug("releasing all step holds on the filesystem")
TryReleaseStepStaleFS(ctx, fs, p.jobId)
return &pdu.HintMostRecentCommonAncestorRes{}, nil
}
// we were hinted a specific common ancestor
mostRecentVersion, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, r.GetSenderVersion())
if err != nil {
msg := "HintMostRecentCommonAncestor rpc with nonexistent most recent version"
getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", mostRecent)).
Warn(msg)
log.Warn(msg)
return nil, errors.Wrap(err, msg)
}
// move replication cursor to this position
_, err = MoveReplicationCursor(ctx, fs, mostRecent, p.jobId)
_, err = MoveReplicationCursor(ctx, fs, mostRecentVersion, p.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it")
log.Debug("not creating replication cursor from bookmark because ZFS does not support it")
// fallthrough
} else if err != nil {
return nil, errors.Wrap(err, "cannot set replication cursor to hinted version")
}
// cleanup previous steps
if err := ReleaseStepAll(ctx, fs, mostRecent, p.jobId); err != nil {
if err := ReleaseStepCummulativeInclusive(ctx, fs, mostRecentVersion, p.jobId); err != nil {
return nil, errors.Wrap(err, "cannot cleanup prior invocation's step holds and bookmarks")
}
@ -147,15 +162,16 @@ func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion
return &zfs.ZFSSendArgVersion{RelName: fsv.GetRelName(), GUID: fsv.Guid}
}
func sendArgsFromPDUAndValidateExists(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (*zfs.ZFSSendArgVersion, error) {
v := uncheckedSendArgsFromPDU(fsv)
if v == nil {
return nil, errors.New("must not be nil")
func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (v zfs.FilesystemVersion, err error) {
sendArgs := uncheckedSendArgsFromPDU(fsv)
if sendArgs == nil {
return v, errors.New("must not be nil")
}
if err := v.ValidateExists(ctx, fs); err != nil {
return nil, err
version, err := sendArgs.ValidateExistsAndGetVersion(ctx, fs)
if err != nil {
return v, err
}
return v, nil
return version, nil
}
func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) {
@ -182,7 +198,7 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St
return nil, nil, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted)
}
sendArgs := zfs.ZFSSendArgs{
sendArgsUnvalidated := zfs.ZFSSendArgsUnvalidated{
FS: r.Filesystem,
From: uncheckedSendArgsFromPDU(r.GetFrom()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
To: uncheckedSendArgsFromPDU(r.GetTo()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
@ -190,6 +206,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St
ResumeToken: r.ResumeToken, // nil or not nil, depending on decoding success
}
sendArgs, err := sendArgsUnvalidated.Validate(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "validate send arguments")
}
getLogger(ctx).Debug("acquire concurrent send semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
@ -224,7 +245,7 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St
// update replication cursor
if sendArgs.From != nil {
// For all but the first replication, this should always be a no-op because SendCompleted already moved the cursor
_, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.From, s.jobId)
_, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.FromVersion, s.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it")
// fallthrough
@ -235,18 +256,18 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St
// make sure `From` doesn't go away in order to make this step resumable
if sendArgs.From != nil {
err := HoldStep(ctx, sendArgs.FS, sendArgs.From, s.jobId)
err := HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId)
if err == zfs.ErrBookmarkCloningNotSupported {
getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it")
// fallthrough
} else if err != nil {
return nil, nil, errors.Wrap(err, "cannot create step bookmark")
return nil, nil, errors.Wrapf(err, "cannot hold `from` version %q before starting send", *sendArgs.FromVersion)
}
}
// make sure `To` doesn't go away in order to make this step resumable
err = HoldStep(ctx, sendArgs.FS, sendArgs.To, s.jobId)
err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId)
if err != nil {
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.To.RelName)
return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion)
}
// step holds & replication cursor released / moved forward in s.SendCompleted => s.moveCursorAndReleaseSendHolds
@ -263,23 +284,24 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
fs := orig.GetFilesystem()
var err error
var from *zfs.ZFSSendArgVersion
var from *zfs.FilesystemVersion
if orig.GetFrom() != nil {
from, err = sendArgsFromPDUAndValidateExists(ctx, fs, orig.GetFrom()) // no shadow
f, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetFrom()) // no shadow
if err != nil {
return nil, errors.Wrap(err, "validate `from` exists")
}
from = &f
}
to, err := sendArgsFromPDUAndValidateExists(ctx, fs, orig.GetTo())
to, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetTo())
if err != nil {
return nil, errors.Wrap(err, "validate `to` exists")
}
log := getLogger(ctx).WithField("to_guid", to.GUID).
log := getLogger(ctx).WithField("to_guid", to.Guid).
WithField("fs", fs).
WithField("to", to.RelName)
if from != nil {
log = log.WithField("from", from.RelName).WithField("from_guid", from.GUID)
log = log.WithField("from", from.RelName).WithField("from_guid", from.Guid)
}
log.Debug("move replication cursor to most recent common version")
@ -320,18 +342,14 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p
return
}
log.Debug("release step-hold of or step-bookmark on `from`")
err := ReleaseStep(ctx, fs, from, p.jobId)
err := ReleaseStep(ctx, fs, *from, p.jobId)
if err != nil {
if dne, ok := err.(*zfs.DatasetDoesNotExist); ok {
// If bookmark cloning is not supported, `from` might be the old replication cursor
// and thus have already been destroyed by MoveReplicationCursor above
// In that case, nonexistence of `from` is not an error, otherwise it is.
fsp, err := zfs.NewDatasetPath(fs)
if err != nil {
panic(err) // fs has been validated multiple times above
}
for _, fsv := range destroyedCursors {
if fsv.ToAbsPath(fsp) == dne.Path {
for _, c := range destroyedCursors {
if c.GetFullPath() == dne.Path {
log.Info("`from` was a replication cursor and has already been destroyed")
return
}
@ -610,9 +628,6 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
if to == nil {
return nil, errors.New("`To` must not be nil")
}
if err := to.ValidateInMemory(lp.ToString()); err != nil {
return nil, errors.Wrap(err, "`To` invalid")
}
if !to.IsSnapshot() {
return nil, errors.New("`To` must be a snapshot")
}
@ -725,7 +740,8 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
}
// validate that we actually received what the sender claimed
if err := to.ValidateExists(ctx, lp.ToString()); err != nil {
toRecvd, err := to.ValidateExistsAndGetVersion(ctx, lp.ToString())
if err != nil {
msg := "receive request's `To` version does not match what we received in the stream"
getLogger(ctx).WithError(err).WithField("snap", snapFullPath).Error(msg)
getLogger(ctx).Error("aborting recv request, but keeping received snapshot for inspection")
@ -734,7 +750,7 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
if s.conf.UpdateLastReceivedHold {
getLogger(ctx).Debug("move last-received-hold")
if err := MoveLastReceivedHold(ctx, lp.ToString(), *to, s.conf.JobID); err != nil {
if err := MoveLastReceivedHold(ctx, lp.ToString(), toRecvd, s.conf.JobID); err != nil {
return nil, errors.Wrap(err, "cannot move last-received-hold")
}
}

View File

@ -1,503 +0,0 @@
package endpoint
import (
"context"
"fmt"
"regexp"
"sort"
"github.com/kr/pretty"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/zfs"
)
var stepHoldTagRE = regexp.MustCompile("^zrepl_STEP_J_(.+)")
func StepHoldTag(jobid JobID) (string, error) {
return stepHoldTagImpl(jobid.String())
}
func stepHoldTagImpl(jobid string) (string, error) {
t := fmt.Sprintf("zrepl_STEP_J_%s", jobid)
if err := zfs.ValidHoldTag(t); err != nil {
return "", err
}
return t, nil
}
// err != nil always means that the bookmark is not a step bookmark
func ParseStepHoldTag(tag string) (JobID, error) {
match := stepHoldTagRE.FindStringSubmatch(tag)
if match == nil {
return JobID{}, fmt.Errorf("parse hold tag: match regex %q", stepHoldTagRE)
}
jobID, err := MakeJobID(match[1])
if err != nil {
return JobID{}, errors.Wrap(err, "parse hold tag: invalid job id field")
}
return jobID, nil
}
const stepBookmarkNamePrefix = "zrepl_STEP"
// v must be validated by caller
func StepBookmarkName(fs string, guid uint64, id JobID) (string, error) {
return stepBookmarkNameImpl(fs, guid, id.String())
}
func stepBookmarkNameImpl(fs string, guid uint64, jobid string) (string, error) {
return makeJobAndGuidBookmarkName(stepBookmarkNamePrefix, fs, guid, jobid)
}
// name is the full bookmark name, including dataset path
//
// err != nil always means that the bookmark is not a step bookmark
func ParseStepBookmarkName(fullname string) (guid uint64, jobID JobID, err error) {
guid, jobID, err = parseJobAndGuidBookmarkName(fullname, stepBookmarkNamePrefix)
if err != nil {
err = errors.Wrap(err, "parse step bookmark name") // no shadow!
}
return guid, jobID, err
}
const replicationCursorBookmarkNamePrefix = "zrepl_CURSOR"
func ReplicationCursorBookmarkName(fs string, guid uint64, id JobID) (string, error) {
return replicationCursorBookmarkNameImpl(fs, guid, id.String())
}
func replicationCursorBookmarkNameImpl(fs string, guid uint64, jobid string) (string, error) {
return makeJobAndGuidBookmarkName(replicationCursorBookmarkNamePrefix, fs, guid, jobid)
}
var ErrV1ReplicationCursor = fmt.Errorf("bookmark name is a v1-replication cursor")
//err != nil always means that the bookmark is not a valid replication bookmark
//
// Returns ErrV1ReplicationCursor as error if the bookmark is a v1 replication cursor
func ParseReplicationCursorBookmarkName(fullname string) (uint64, JobID, error) {
// check for legacy cursors
{
if err := zfs.EntityNamecheck(fullname, zfs.EntityTypeBookmark); err != nil {
return 0, JobID{}, errors.Wrap(err, "parse replication cursor bookmark name")
}
_, _, name, err := zfs.DecomposeVersionString(fullname)
if err != nil {
return 0, JobID{}, errors.Wrap(err, "parse replication cursor bookmark name: decompose version string")
}
const V1ReplicationCursorBookmarkName = "zrepl_replication_cursor"
if name == V1ReplicationCursorBookmarkName {
return 0, JobID{}, ErrV1ReplicationCursor
}
}
guid, jobID, err := parseJobAndGuidBookmarkName(fullname, replicationCursorBookmarkNamePrefix)
if err != nil {
err = errors.Wrap(err, "parse replication cursor bookmark name") // no shadow
}
return guid, jobID, err
}
// may return nil for both values, indicating there is no cursor
func GetMostRecentReplicationCursorOfJob(ctx context.Context, fs string, jobID JobID) (*zfs.FilesystemVersion, error) {
fsp, err := zfs.NewDatasetPath(fs)
if err != nil {
return nil, err
}
candidates, err := GetReplicationCursors(ctx, fsp, jobID)
if err != nil || len(candidates) == 0 {
return nil, err
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].CreateTXG < candidates[j].CreateTXG
})
mostRecent := candidates[len(candidates)-1]
return &mostRecent, nil
}
func GetReplicationCursors(ctx context.Context, fs *zfs.DatasetPath, jobID JobID) ([]zfs.FilesystemVersion, error) {
listOut := &ListHoldsAndBookmarksOutput{}
if err := listZFSHoldsAndBookmarksImplFS(ctx, listOut, fs); err != nil {
return nil, errors.Wrap(err, "get replication cursor: list bookmarks and holds")
}
if len(listOut.V1ReplicationCursors) > 0 {
getLogger(ctx).WithField("bookmark", pretty.Sprint(listOut.V1ReplicationCursors)).
Warn("found v1-replication cursor bookmarks, consider running migration 'replication-cursor:v1-v2' after successful replication with this zrepl version")
}
candidates := make([]zfs.FilesystemVersion, 0)
for _, v := range listOut.ReplicationCursorBookmarks {
zv := zfs.ZFSSendArgVersion{
RelName: "#" + v.Name,
GUID: v.Guid,
}
if err := zv.ValidateExists(ctx, v.FS); err != nil {
getLogger(ctx).WithError(err).WithField("bookmark", zv.FullPath(v.FS)).
Error("found invalid replication cursor bookmark")
continue
}
candidates = append(candidates, v.v)
}
return candidates, nil
}
// `target` is validated before replication cursor is set. if validation fails, the cursor is not moved.
//
// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS
func MoveReplicationCursor(ctx context.Context, fs string, target *zfs.ZFSSendArgVersion, jobID JobID) (destroyedCursors []zfs.FilesystemVersion, err error) {
if !target.IsSnapshot() {
return nil, zfs.ErrBookmarkCloningNotSupported
}
snapProps, err := target.ValidateExistsAndGetCheckedProps(ctx, fs)
if err != nil {
return nil, errors.Wrapf(err, "invalid replication cursor target %q (guid=%v)", target.RelName, target.GUID)
}
bookmarkname, err := ReplicationCursorBookmarkName(fs, snapProps.Guid, jobID)
if err != nil {
return nil, errors.Wrap(err, "determine replication cursor name")
}
// idempotently create bookmark (guid is encoded in it, hence we'll most likely add a new one
// cleanup the old one afterwards
err = zfs.ZFSBookmark(fs, *target, bookmarkname)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
return nil, err // TODO go1.13 use wrapping
}
return nil, errors.Wrapf(err, "cannot create bookmark")
}
destroyedCursors, err = DestroyObsoleteReplicationCursors(ctx, fs, target, jobID)
if err != nil {
return nil, errors.Wrap(err, "destroy obsolete replication cursors")
}
return destroyedCursors, nil
}
func DestroyObsoleteReplicationCursors(ctx context.Context, fs string, target *zfs.ZFSSendArgVersion, jobID JobID) (destroyed []zfs.FilesystemVersion, err error) {
return destroyBookmarksOlderThan(ctx, fs, target, jobID, func(shortname string) (accept bool) {
_, parsedID, err := ParseReplicationCursorBookmarkName(fs + "#" + shortname)
return err == nil && parsedID == jobID
})
}
// idempotently hold / step-bookmark `version`
//
// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS
func HoldStep(ctx context.Context, fs string, v *zfs.ZFSSendArgVersion, jobID JobID) error {
if err := v.ValidateExists(ctx, fs); err != nil {
return err
}
if v.IsSnapshot() {
tag, err := StepHoldTag(jobID)
if err != nil {
return errors.Wrap(err, "step hold tag")
}
if err := zfs.ZFSHold(ctx, fs, *v, tag); err != nil {
return errors.Wrap(err, "step hold: zfs")
}
return nil
}
v.MustBeBookmark()
bmname, err := StepBookmarkName(fs, v.GUID, jobID)
if err != nil {
return errors.Wrap(err, "create step bookmark: determine bookmark name")
}
// idempotently create bookmark
err = zfs.ZFSBookmark(fs, *v, bmname)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
// TODO we could actually try to find a local snapshot that has the requested GUID
// however, the replication algorithm prefers snapshots anyways, so this quest
// is most likely not going to be successful. Also, there's the possibility that
// the caller might want to filter what snapshots are eligible, and this would
// complicate things even further.
return err // TODO go1.13 use wrapping
}
return errors.Wrap(err, "create step bookmark: zfs")
}
return nil
}
// idempotently release the step-hold on v if v is a snapshot
// or idempotently destroy the step-bookmark of v if v is a bookmark
//
// note that this operation leaves v itself untouched, unless v is the step-bookmark itself, in which case v is destroyed
//
// returns an instance of *zfs.DatasetDoesNotExist if `v` does not exist
func ReleaseStep(ctx context.Context, fs string, v *zfs.ZFSSendArgVersion, jobID JobID) error {
if err := v.ValidateExists(ctx, fs); err != nil {
return err
}
if v.IsSnapshot() {
tag, err := StepHoldTag(jobID)
if err != nil {
return errors.Wrap(err, "step release tag")
}
if err := zfs.ZFSRelease(ctx, tag, v.FullPath(fs)); err != nil {
return errors.Wrap(err, "step release: zfs")
}
return nil
}
v.MustBeBookmark()
bmname, err := StepBookmarkName(fs, v.GUID, jobID)
if err != nil {
return errors.Wrap(err, "step release: determine bookmark name")
}
// idempotently destroy bookmark
if err := zfs.ZFSDestroyIdempotent(bmname); err != nil {
return errors.Wrap(err, "step release: bookmark destroy: zfs")
}
return nil
}
// release {step holds, step bookmarks} earlier and including `mostRecent`
func ReleaseStepAll(ctx context.Context, fs string, mostRecent *zfs.ZFSSendArgVersion, jobID JobID) error {
if err := mostRecent.ValidateInMemory(fs); err != nil {
return err
}
tag, err := StepHoldTag(jobID)
if err != nil {
return errors.Wrap(err, "step release all: tag")
}
err = zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, mostRecent.GUID, tag)
if err != nil {
return errors.Wrapf(err, "step release all: release holds older and including %q", mostRecent.FullPath(fs))
}
_, err = destroyBookmarksOlderThan(ctx, fs, mostRecent, jobID, func(shortname string) bool {
_, parsedId, parseErr := ParseStepBookmarkName(fs + "#" + shortname)
return parseErr == nil && parsedId == jobID
})
if err != nil {
return errors.Wrapf(err, "step release all: destroy bookmarks older than %q", mostRecent.FullPath(fs))
}
return nil
}
var lastReceivedHoldTagRE = regexp.MustCompile("^zrepl_last_received_J_(.+)$")
// err != nil always means that the bookmark is not a step bookmark
func ParseLastReceivedHoldTag(tag string) (JobID, error) {
match := lastReceivedHoldTagRE.FindStringSubmatch(tag)
if match == nil {
return JobID{}, errors.Errorf("parse last-received-hold tag: does not match regex %s", lastReceivedHoldTagRE.String())
}
jobId, err := MakeJobID(match[1])
if err != nil {
return JobID{}, errors.Wrap(err, "parse last-received-hold tag: invalid job id field")
}
return jobId, nil
}
func LastReceivedHoldTag(jobID JobID) (string, error) {
return lastReceivedHoldImpl(jobID.String())
}
func lastReceivedHoldImpl(jobid string) (string, error) {
tag := fmt.Sprintf("zrepl_last_received_J_%s", jobid)
if err := zfs.ValidHoldTag(tag); err != nil {
return "", err
}
return tag, nil
}
func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.ZFSSendArgVersion, jobID JobID) error {
if err := to.ValidateExists(ctx, fs); err != nil {
return err
}
if err := zfs.EntityNamecheck(to.FullPath(fs), zfs.EntityTypeSnapshot); err != nil {
return err
}
tag, err := LastReceivedHoldTag(jobID)
if err != nil {
return errors.Wrap(err, "last-received-hold: hold tag")
}
// we never want to be without a hold
// => hold new one before releasing old hold
err = zfs.ZFSHold(ctx, fs, to, tag)
if err != nil {
return errors.Wrap(err, "last-received-hold: hold newly received")
}
err = zfs.ZFSReleaseAllOlderThanGUID(ctx, fs, to.GUID, tag)
if err != nil {
return errors.Wrap(err, "last-received-hold: release older holds")
}
return nil
}
type ListHoldsAndBookmarksOutputBookmarkV1ReplicationCursor struct {
FS string
Name string
}
type ListHoldsAndBookmarksOutput struct {
StepBookmarks []*ListHoldsAndBookmarksOutputBookmark
StepHolds []*ListHoldsAndBookmarksOutputHold
ReplicationCursorBookmarks []*ListHoldsAndBookmarksOutputBookmark
V1ReplicationCursors []*ListHoldsAndBookmarksOutputBookmarkV1ReplicationCursor
LastReceivedHolds []*ListHoldsAndBookmarksOutputHold
}
type ListHoldsAndBookmarksOutputBookmark struct {
FS, Name string
Guid uint64
JobID JobID
v zfs.FilesystemVersion
}
type ListHoldsAndBookmarksOutputHold struct {
FS string
Snap string
SnapGuid uint64
SnapCreateTXG uint64
Tag string
JobID JobID
}
// List all holds and bookmarks managed by endpoint
func ListZFSHoldsAndBookmarks(ctx context.Context, fsfilter zfs.DatasetFilter) (*ListHoldsAndBookmarksOutput, error) {
// initialize all fields so that JSON serialization of output looks pretty (see client/holds.go)
// however, listZFSHoldsAndBookmarksImplFS shouldn't rely on it
out := &ListHoldsAndBookmarksOutput{
StepBookmarks: make([]*ListHoldsAndBookmarksOutputBookmark, 0),
StepHolds: make([]*ListHoldsAndBookmarksOutputHold, 0),
ReplicationCursorBookmarks: make([]*ListHoldsAndBookmarksOutputBookmark, 0),
V1ReplicationCursors: make([]*ListHoldsAndBookmarksOutputBookmarkV1ReplicationCursor, 0),
LastReceivedHolds: make([]*ListHoldsAndBookmarksOutputHold, 0),
}
fss, err := zfs.ZFSListMapping(ctx, fsfilter)
if err != nil {
return nil, errors.Wrap(err, "list filesystems")
}
for _, fs := range fss {
err := listZFSHoldsAndBookmarksImplFS(ctx, out, fs)
if err != nil {
return nil, errors.Wrapf(err, "list holds and bookmarks on %q", fs.ToString())
}
}
return out, nil
}
func listZFSHoldsAndBookmarksImplFS(ctx context.Context, out *ListHoldsAndBookmarksOutput, fs *zfs.DatasetPath) error {
fsvs, err := zfs.ZFSListFilesystemVersions(fs, nil)
if err != nil {
return errors.Wrapf(err, "list filesystem versions of %q", fs)
}
for _, v := range fsvs {
switch v.Type {
case zfs.Bookmark:
listZFSHoldsAndBookmarksImplTryParseBookmark(ctx, out, fs, v)
case zfs.Snapshot:
holds, err := zfs.ZFSHolds(ctx, fs.ToString(), v.Name)
if err != nil {
return errors.Wrapf(err, "get holds of %q", v.ToAbsPath(fs))
}
for _, tag := range holds {
listZFSHoldsAndBookmarksImplSnapshotTryParseHold(ctx, out, fs, v, tag)
}
default:
continue
}
}
return nil
}
// pure function, err != nil always indicates parsing error
func listZFSHoldsAndBookmarksImplTryParseBookmark(ctx context.Context, out *ListHoldsAndBookmarksOutput, fs *zfs.DatasetPath, v zfs.FilesystemVersion) {
var err error
if v.Type != zfs.Bookmark {
panic("impl error")
}
fullname := v.ToAbsPath(fs)
bm := &ListHoldsAndBookmarksOutputBookmark{
FS: fs.ToString(), Name: v.Name, v: v,
}
bm.Guid, bm.JobID, err = ParseStepBookmarkName(fullname)
if err == nil {
out.StepBookmarks = append(out.StepBookmarks, bm)
return
}
bm.Guid, bm.JobID, err = ParseReplicationCursorBookmarkName(fullname)
if err == nil {
out.ReplicationCursorBookmarks = append(out.ReplicationCursorBookmarks, bm)
return
} else if err == ErrV1ReplicationCursor {
v1rc := &ListHoldsAndBookmarksOutputBookmarkV1ReplicationCursor{
FS: fs.ToString(), Name: v.Name,
}
out.V1ReplicationCursors = append(out.V1ReplicationCursors, v1rc)
return
}
}
// pure function, err != nil always indicates parsing error
func listZFSHoldsAndBookmarksImplSnapshotTryParseHold(ctx context.Context, out *ListHoldsAndBookmarksOutput, fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) {
var err error
if v.Type != zfs.Snapshot {
panic("impl error")
}
hold := &ListHoldsAndBookmarksOutputHold{
FS: fs.ToString(),
Snap: v.Name,
SnapGuid: v.Guid,
SnapCreateTXG: v.CreateTXG,
Tag: holdTag,
}
hold.JobID, err = ParseStepHoldTag(holdTag)
if err == nil {
out.StepHolds = append(out.StepHolds, hold)
return
}
hold.JobID, err = ParseLastReceivedHoldTag(holdTag)
if err == nil {
out.LastReceivedHolds = append(out.LastReceivedHolds, hold)
return
}
}

View File

@ -0,0 +1,530 @@
package endpoint
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/zfs"
)
type AbstractionType string
// Implementation note:
// There are a lot of exhaustive switches on AbstractionType in the code base.
// When adding a new abstraction type, make sure to search and update them!
const (
AbstractionStepBookmark AbstractionType = "step-bookmark"
AbstractionStepHold AbstractionType = "step-hold"
AbstractionLastReceivedHold AbstractionType = "last-received-hold"
AbstractionReplicationCursorBookmarkV1 AbstractionType = "replication-cursor-bookmark-v1"
AbstractionReplicationCursorBookmarkV2 AbstractionType = "replication-cursor-bookmark-v2"
)
var AbstractionTypesAll = map[AbstractionType]bool{
AbstractionStepBookmark: true,
AbstractionStepHold: true,
AbstractionLastReceivedHold: true,
AbstractionReplicationCursorBookmarkV1: true,
AbstractionReplicationCursorBookmarkV2: true,
}
// Implementation Note:
// Whenever you add a new accessor, adjust AbstractionJSON.MarshalJSON accordingly
type Abstraction interface {
GetType() AbstractionType
GetFS() string
GetName() string
GetFullPath() string
GetJobID() *JobID // may return nil if the abstraction does not have a JobID
GetCreateTXG() uint64
GetFilesystemVersion() zfs.FilesystemVersion
String() string
// destroy the abstraction: either releases the hold or destroys the bookmark
Destroy(context.Context) error
json.Marshaler
}
func (t AbstractionType) Validate() error {
switch t {
case AbstractionStepBookmark:
return nil
case AbstractionStepHold:
return nil
case AbstractionLastReceivedHold:
return nil
case AbstractionReplicationCursorBookmarkV1:
return nil
case AbstractionReplicationCursorBookmarkV2:
return nil
default:
return errors.Errorf("unknown abstraction type %q", t)
}
}
func (t AbstractionType) MustValidate() error {
if err := t.Validate(); err != nil {
panic(err)
}
return nil
}
// Number of instances of this abstraction type that are live (not stale)
// per (FS,JobID). -1 for infinity.
func (t AbstractionType) NumLivePerFsAndJob() int {
switch t {
case AbstractionStepBookmark:
return 2
case AbstractionStepHold:
return 2
case AbstractionLastReceivedHold:
return 1
case AbstractionReplicationCursorBookmarkV1:
return -1
case AbstractionReplicationCursorBookmarkV2:
return 1
default:
panic(t)
}
}
type AbstractionJSON struct{ Abstraction }
var _ json.Marshaler = (*AbstractionJSON)(nil)
func (a AbstractionJSON) MarshalJSON() ([]byte, error) {
type S struct {
Type AbstractionType
FS string
Name string
FullPath string
JobID *JobID // may return nil if the abstraction does not have a JobID
CreateTXG uint64
FilesystemVersion zfs.FilesystemVersion
String string
}
v := S{
Type: a.Abstraction.GetType(),
FS: a.Abstraction.GetFS(),
Name: a.Abstraction.GetName(),
FullPath: a.Abstraction.GetFullPath(),
JobID: a.Abstraction.GetJobID(),
CreateTXG: a.Abstraction.GetCreateTXG(),
FilesystemVersion: a.Abstraction.GetFilesystemVersion(),
String: a.Abstraction.String(),
}
return json.Marshal(v)
}
type AbstractionTypeSet map[AbstractionType]bool
func AbstractionTypeSetFromStrings(sts []string) (AbstractionTypeSet, error) {
ats := make(map[AbstractionType]bool, len(sts))
for i, t := range sts {
at := AbstractionType(t)
if err := at.Validate(); err != nil {
return nil, errors.Wrapf(err, "invalid abstraction type #%d %q", i+1, t)
}
ats[at] = true
}
return ats, nil
}
func (s AbstractionTypeSet) String() string {
sts := make([]string, 0, len(s))
for i := range s {
sts = append(sts, string(i))
}
sts = sort.StringSlice(sts)
return strings.Join(sts, ",")
}
func (s AbstractionTypeSet) Validate() error {
for k := range s {
if err := k.Validate(); err != nil {
return err
}
}
return nil
}
type BookmarkExtractor func(fs *zfs.DatasetPath, v zfs.FilesystemVersion) Abstraction
// returns nil if the abstraction type is not bookmark-based
func (t AbstractionType) BookmarkExtractor() BookmarkExtractor {
switch t {
case AbstractionStepBookmark:
return StepBookmarkExtractor
case AbstractionReplicationCursorBookmarkV1:
return ReplicationCursorV1Extractor
case AbstractionReplicationCursorBookmarkV2:
return ReplicationCursorV2Extractor
case AbstractionStepHold:
return nil
case AbstractionLastReceivedHold:
return nil
default:
panic(fmt.Sprintf("unimpl: %q", t))
}
}
type HoldExtractor = func(fs *zfs.DatasetPath, v zfs.FilesystemVersion, tag string) Abstraction
// returns nil if the abstraction type is not hold-based
func (t AbstractionType) HoldExtractor() HoldExtractor {
switch t {
case AbstractionStepBookmark:
return nil
case AbstractionReplicationCursorBookmarkV1:
return nil
case AbstractionReplicationCursorBookmarkV2:
return nil
case AbstractionStepHold:
return StepHoldExtractor
case AbstractionLastReceivedHold:
return LastReceivedHoldExtractor
default:
panic(fmt.Sprintf("unimpl: %q", t))
}
}
type ListZFSHoldsAndBookmarksQuery struct {
FS ListZFSHoldsAndBookmarksQueryFilesystemFilter
// What abstraction types should match (any contained in the set)
What AbstractionTypeSet
// The output for the query must satisfy _all_ (AND) requirements of all fields in this query struct.
// if not nil: JobID of the hold or bookmark in question must be equal
// else: JobID of the hold or bookmark can be any value
JobID *JobID
// if not nil: The hold's snapshot or the bookmark's createtxg must be less than (or equal) Until
// else: CreateTXG of the hold or bookmark can be any value
Until *InclusiveExclusiveCreateTXG
// TODO
// Concurrent: uint > 0
}
type InclusiveExclusiveCreateTXG struct {
CreateTXG uint64
Inclusive *zfs.NilBool // must not be nil
}
// FS == nil XOR Filter == nil
type ListZFSHoldsAndBookmarksQueryFilesystemFilter struct {
FS *string
Filter zfs.DatasetFilter
}
func (q *ListZFSHoldsAndBookmarksQuery) Validate() error {
if err := q.FS.Validate(); err != nil {
return errors.Wrap(err, "FS")
}
if q.JobID != nil {
q.JobID.MustValidate() // FIXME
}
if q.Until != nil {
if err := q.Until.Validate(); err != nil {
return errors.Wrap(err, "Until")
}
}
if err := q.What.Validate(); err != nil {
return err
}
return nil
}
var zreplEndpointListAbstractionsQueryCreatetxg0Allowed = envconst.Bool("ZREPL_ENDPOINT_LIST_ABSTRACTIONS_QUERY_CREATETXG_0_ALLOWED", false)
func (i *InclusiveExclusiveCreateTXG) Validate() error {
if err := i.Inclusive.Validate(); err != nil {
return errors.Wrap(err, "Inclusive")
}
if i.CreateTXG == 0 && !zreplEndpointListAbstractionsQueryCreatetxg0Allowed {
return errors.New("CreateTXG must be non-zero")
}
return nil
}
func (f *ListZFSHoldsAndBookmarksQueryFilesystemFilter) Validate() error {
if f == nil {
return nil
}
fsSet := f.FS != nil
filterSet := f.Filter != nil
if fsSet && filterSet || !fsSet && !filterSet {
return fmt.Errorf("must set FS or Filter field, but fsIsSet=%v and filterIsSet=%v", fsSet, filterSet)
}
if fsSet {
if err := zfs.EntityNamecheck(*f.FS, zfs.EntityTypeFilesystem); err != nil {
return errors.Wrap(err, "FS invalid")
}
}
return nil
}
func (f *ListZFSHoldsAndBookmarksQueryFilesystemFilter) Filesystems(ctx context.Context) ([]string, error) {
if err := f.Validate(); err != nil {
panic(err)
}
if f.FS != nil {
return []string{*f.FS}, nil
}
if f.Filter != nil {
dps, err := zfs.ZFSListMapping(ctx, f.Filter)
if err != nil {
return nil, err
}
fss := make([]string, len(dps))
for i, dp := range dps {
fss[i] = dp.ToString()
}
return fss, nil
}
panic("unreachable")
}
type ListAbstractionsError struct {
FS string
Snap string
What string
Err error
}
func (e ListAbstractionsError) Error() string {
if e.FS == "" {
return fmt.Sprintf("list endpoint abstractions: %s: %s", e.What, e.Err)
} else {
v := e.FS
if e.Snap != "" {
v = fmt.Sprintf("%s@%s", e.FS, e.Snap)
}
return fmt.Sprintf("list endpoint abstractions on %q: %s: %s", v, e.What, e.Err)
}
}
type putListAbstractionErr func(err error, fs string, what string)
type putListAbstraction func(a Abstraction)
type ListAbstractionsErrors []ListAbstractionsError
func (e ListAbstractionsErrors) Error() string {
if len(e) == 0 {
panic(e)
}
if len(e) == 1 {
return fmt.Sprintf("list endpoint abstractions: %s", e[0])
}
msgs := make([]string, len(e))
for i := range e {
msgs[i] = e.Error()
}
return fmt.Sprintf("list endpoint abstractions: multiple errors:\n%s", strings.Join(msgs, "\n"))
}
func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (out []Abstraction, outErrs []ListAbstractionsError, err error) {
// impl note: structure the query processing in such a way that
// a minimum amount of zfs shell-outs needs to be done
if err := query.Validate(); err != nil {
return nil, nil, errors.Wrap(err, "validate query")
}
fss, err := query.FS.Filesystems(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "list filesystems")
}
errCb := func(err error, fs string, what string) {
outErrs = append(outErrs, ListAbstractionsError{Err: err, FS: fs, What: what})
}
emitAbstraction := func(a Abstraction) {
jobIdMatches := query.JobID == nil || a.GetJobID() == nil || *a.GetJobID() == *query.JobID
untilMatches := query.Until == nil
if query.Until != nil {
if query.Until.Inclusive.B {
untilMatches = a.GetCreateTXG() <= query.Until.CreateTXG
} else {
untilMatches = a.GetCreateTXG() < query.Until.CreateTXG
}
}
if jobIdMatches && untilMatches {
out = append(out, a)
}
}
for _, fs := range fss {
listAbstractionsImplFS(ctx, fs, &query, emitAbstraction, errCb)
}
return out, outErrs, nil
}
func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsAndBookmarksQuery, emitCandidate putListAbstraction, errCb putListAbstractionErr) {
fsp, err := zfs.NewDatasetPath(fs)
if err != nil {
panic(err)
}
if len(query.What) == 0 {
return
}
// we need filesystem versions for any abstraction type
fsvs, err := zfs.ZFSListFilesystemVersions(fsp, nil)
if err != nil {
errCb(err, fs, "list filesystem versions")
return
}
for at := range query.What {
bmE := at.BookmarkExtractor()
holdE := at.HoldExtractor()
if bmE == nil && holdE == nil || bmE != nil && holdE != nil {
panic("implementation error: extractors misconfigured for " + at)
}
for _, v := range fsvs {
var a Abstraction
if v.Type == zfs.Bookmark && bmE != nil {
a = bmE(fsp, v)
}
if v.Type == zfs.Snapshot && holdE != nil {
holds, err := zfs.ZFSHolds(ctx, fsp.ToString(), v.Name)
if err != nil {
errCb(err, v.ToAbsPath(fsp), "get hold on snap")
continue
}
for _, tag := range holds {
a = holdE(fsp, v, tag)
}
}
if a != nil {
emitCandidate(a)
}
}
}
}
type BatchDestroyResult struct {
Abstraction
DestroyErr error
}
var _ json.Marshaler = (*BatchDestroyResult)(nil)
func (r BatchDestroyResult) MarshalJSON() ([]byte, error) {
err := ""
if r.DestroyErr != nil {
err = r.DestroyErr.Error()
}
s := struct {
Abstraction AbstractionJSON
DestroyErr string
}{
AbstractionJSON{r.Abstraction},
err,
}
return json.Marshal(s)
}
func BatchDestroy(ctx context.Context, abs []Abstraction) <-chan BatchDestroyResult {
// hold-based batching: per snapshot
// bookmark-based batching: none possible via CLI
// => not worth the trouble for now, will be worth it once we start using channel programs
// => TODO: actual batching using channel programs
res := make(chan BatchDestroyResult, len(abs))
go func() {
for _, a := range abs {
res <- BatchDestroyResult{
a,
a.Destroy(ctx),
}
}
close(res)
}()
return res
}
type StalenessInfo struct {
ConstructedWithQuery ListZFSHoldsAndBookmarksQuery
All []Abstraction
Live []Abstraction
Stale []Abstraction
}
func ListStale(ctx context.Context, q ListZFSHoldsAndBookmarksQuery) (*StalenessInfo, error) {
if q.Until != nil {
// we must determine the most recent step per FS, can't allow that
return nil, errors.New("ListStale cannot have Until != nil set on query")
}
abs, absErr, err := ListAbstractions(ctx, q)
if err != nil {
return nil, err
}
if len(absErr) > 0 {
// can't go on here because we can't determine the most recent step
return nil, ListAbstractionsErrors(absErr)
}
si := listStaleFiltering(abs)
si.ConstructedWithQuery = q
return si, nil
}
// The last AbstractionType.NumLive() step holds per (FS,Job,AbstractionType) are live
// others are stale.
//
// the returned StalenessInfo.ConstructedWithQuery is not set
func listStaleFiltering(abs []Abstraction) *StalenessInfo {
type fsAjobAtype struct {
FS string
Job JobID
Type AbstractionType
}
var noJobId []Abstraction
by := make(map[fsAjobAtype][]Abstraction)
for _, a := range abs {
if a.GetJobID() == nil {
noJobId = append(noJobId, a)
continue
}
faj := fsAjobAtype{a.GetFS(), *a.GetJobID(), a.GetType()}
l := by[faj]
l = append(l, a)
by[faj] = l
}
ret := &StalenessInfo{
All: abs,
Live: noJobId,
Stale: []Abstraction{},
}
// sort descending (highest createtxg first), then cut off
for k := range by {
l := by[k]
sort.Slice(l, func(i, j int) bool {
return l[i].GetCreateTXG() > l[j].GetCreateTXG()
})
cutoff := k.Type.NumLivePerFsAndJob()
if cutoff == -1 || len(l) <= cutoff {
ret.Live = append(ret.Live, l...)
} else {
ret.Live = append(ret.Live, l[0:cutoff]...)
ret.Stale = append(ret.Stale, l[cutoff:]...)
}
}
return ret
}

View File

@ -0,0 +1,377 @@
package endpoint
import (
"context"
"encoding/json"
"fmt"
"regexp"
"sort"
"github.com/kr/pretty"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/util/errorarray"
"github.com/zrepl/zrepl/zfs"
)
const replicationCursorBookmarkNamePrefix = "zrepl_CURSOR"
func ReplicationCursorBookmarkName(fs string, guid uint64, id JobID) (string, error) {
return replicationCursorBookmarkNameImpl(fs, guid, id.String())
}
func replicationCursorBookmarkNameImpl(fs string, guid uint64, jobid string) (string, error) {
return makeJobAndGuidBookmarkName(replicationCursorBookmarkNamePrefix, fs, guid, jobid)
}
var ErrV1ReplicationCursor = fmt.Errorf("bookmark name is a v1-replication cursor")
//err != nil always means that the bookmark is not a valid replication bookmark
//
// Returns ErrV1ReplicationCursor as error if the bookmark is a v1 replication cursor
func ParseReplicationCursorBookmarkName(fullname string) (uint64, JobID, error) {
// check for legacy cursors
{
if err := zfs.EntityNamecheck(fullname, zfs.EntityTypeBookmark); err != nil {
return 0, JobID{}, errors.Wrap(err, "parse replication cursor bookmark name")
}
_, _, name, err := zfs.DecomposeVersionString(fullname)
if err != nil {
return 0, JobID{}, errors.Wrap(err, "parse replication cursor bookmark name: decompose version string")
}
const V1ReplicationCursorBookmarkName = "zrepl_replication_cursor"
if name == V1ReplicationCursorBookmarkName {
return 0, JobID{}, ErrV1ReplicationCursor
}
// fallthrough to main parser
}
guid, jobID, err := parseJobAndGuidBookmarkName(fullname, replicationCursorBookmarkNamePrefix)
if err != nil {
err = errors.Wrap(err, "parse replication cursor bookmark name") // no shadow
}
return guid, jobID, err
}
// may return nil for both values, indicating there is no cursor
func GetMostRecentReplicationCursorOfJob(ctx context.Context, fs string, jobID JobID) (*zfs.FilesystemVersion, error) {
fsp, err := zfs.NewDatasetPath(fs)
if err != nil {
return nil, err
}
candidates, err := GetReplicationCursors(ctx, fsp, jobID)
if err != nil || len(candidates) == 0 {
return nil, err
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].CreateTXG < candidates[j].CreateTXG
})
mostRecent := candidates[len(candidates)-1]
return &mostRecent, nil
}
func GetReplicationCursors(ctx context.Context, dp *zfs.DatasetPath, jobID JobID) ([]zfs.FilesystemVersion, error) {
fs := dp.ToString()
q := ListZFSHoldsAndBookmarksQuery{
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{FS: &fs},
What: map[AbstractionType]bool{
AbstractionReplicationCursorBookmarkV1: true,
AbstractionReplicationCursorBookmarkV2: true,
},
JobID: &jobID,
Until: nil,
}
abs, absErr, err := ListAbstractions(ctx, q)
if err != nil {
return nil, errors.Wrap(err, "get replication cursor: list bookmarks and holds")
}
if len(absErr) > 0 {
return nil, ListAbstractionsErrors(absErr)
}
var v1, v2 []Abstraction
for _, a := range abs {
switch a.GetType() {
case AbstractionReplicationCursorBookmarkV1:
v1 = append(v1, a)
case AbstractionReplicationCursorBookmarkV2:
v2 = append(v2, a)
default:
panic("unexpected abstraction: " + a.GetType())
}
}
if len(v1) > 0 {
getLogger(ctx).WithField("bookmark", pretty.Sprint(v1)).
Warn("found v1-replication cursor bookmarks, consider running migration 'replication-cursor:v1-v2' after successful replication with this zrepl version")
}
candidates := make([]zfs.FilesystemVersion, 0)
for _, v := range v2 {
candidates = append(candidates, v.GetFilesystemVersion())
}
return candidates, nil
}
type ReplicationCursorTarget interface {
IsSnapshot() bool
GetGuid() uint64
GetCreateTXG() uint64
ToSendArgVersion() zfs.ZFSSendArgVersion
}
// `target` is validated before replication cursor is set. if validation fails, the cursor is not moved.
//
// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS
func MoveReplicationCursor(ctx context.Context, fs string, target ReplicationCursorTarget, jobID JobID) (destroyedCursors []Abstraction, err error) {
if !target.IsSnapshot() {
return nil, zfs.ErrBookmarkCloningNotSupported
}
bookmarkname, err := ReplicationCursorBookmarkName(fs, target.GetGuid(), jobID)
if err != nil {
return nil, errors.Wrap(err, "determine replication cursor name")
}
// idempotently create bookmark (guid is encoded in it, hence we'll most likely add a new one
// cleanup the old one afterwards
err = zfs.ZFSBookmark(fs, target.ToSendArgVersion(), bookmarkname)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
return nil, err // TODO go1.13 use wrapping
}
return nil, errors.Wrapf(err, "cannot create bookmark")
}
destroyedCursors, err = DestroyObsoleteReplicationCursors(ctx, fs, target, jobID)
if err != nil {
return nil, errors.Wrap(err, "destroy obsolete replication cursors")
}
return destroyedCursors, nil
}
type ReplicationCursor interface {
GetCreateTXG() uint64
}
func DestroyObsoleteReplicationCursors(ctx context.Context, fs string, current ReplicationCursor, jobID JobID) (_ []Abstraction, err error) {
q := ListZFSHoldsAndBookmarksQuery{
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &fs,
},
What: AbstractionTypeSet{
AbstractionReplicationCursorBookmarkV2: true,
},
JobID: &jobID,
Until: &InclusiveExclusiveCreateTXG{
CreateTXG: current.GetCreateTXG(),
Inclusive: &zfs.NilBool{B: false},
},
}
abs, absErr, err := ListAbstractions(ctx, q)
if err != nil {
return nil, errors.Wrap(err, "list abstractions")
}
if len(absErr) > 0 {
return nil, errors.Wrap(ListAbstractionsErrors(absErr), "list abstractions")
}
var destroyed []Abstraction
var errs []error
for res := range BatchDestroy(ctx, abs) {
log := getLogger(ctx).
WithField("replication_cursor_bookmark", res.Abstraction)
if res.DestroyErr != nil {
errs = append(errs, res.DestroyErr)
log.WithError(err).
Error("cannot destroy obsolete replication cursor bookmark")
} else {
destroyed = append(destroyed, res.Abstraction)
log.Info("destroyed obsolete replication cursor bookmark")
}
}
if len(errs) == 0 {
return destroyed, nil
} else {
return destroyed, errorarray.Wrap(errs, "destroy obsolete replication cursor")
}
}
var lastReceivedHoldTagRE = regexp.MustCompile("^zrepl_last_received_J_(.+)$")
// err != nil always means that the bookmark is not a step bookmark
func ParseLastReceivedHoldTag(tag string) (JobID, error) {
match := lastReceivedHoldTagRE.FindStringSubmatch(tag)
if match == nil {
return JobID{}, errors.Errorf("parse last-received-hold tag: does not match regex %s", lastReceivedHoldTagRE.String())
}
jobId, err := MakeJobID(match[1])
if err != nil {
return JobID{}, errors.Wrap(err, "parse last-received-hold tag: invalid job id field")
}
return jobId, nil
}
func LastReceivedHoldTag(jobID JobID) (string, error) {
return lastReceivedHoldImpl(jobID.String())
}
func lastReceivedHoldImpl(jobid string) (string, error) {
tag := fmt.Sprintf("zrepl_last_received_J_%s", jobid)
if err := zfs.ValidHoldTag(tag); err != nil {
return "", err
}
return tag, nil
}
func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) error {
if to.IsSnapshot() {
return errors.Errorf("last-received-hold: target must be a snapshot: %s", to.FullPath(fs))
}
tag, err := LastReceivedHoldTag(jobID)
if err != nil {
return errors.Wrap(err, "last-received-hold: hold tag")
}
// we never want to be without a hold
// => hold new one before releasing old hold
err = zfs.ZFSHold(ctx, fs, to, tag)
if err != nil {
return errors.Wrap(err, "last-received-hold: hold newly received")
}
q := ListZFSHoldsAndBookmarksQuery{
What: AbstractionTypeSet{
AbstractionLastReceivedHold: true,
},
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &fs,
},
JobID: &jobID,
Until: &InclusiveExclusiveCreateTXG{
CreateTXG: to.GetCreateTXG(),
Inclusive: &zfs.NilBool{B: false},
},
}
abs, absErrs, err := ListAbstractions(ctx, q)
if err != nil {
return errors.Wrap(err, "last-received-hold: list")
}
if len(absErrs) > 0 {
return errors.Wrap(ListAbstractionsErrors(absErrs), "last-received-hold: list")
}
getLogger(ctx).WithField("last-received-holds", fmt.Sprintf("%s", abs)).Debug("releasing last-received-holds")
var errs []error
for res := range BatchDestroy(ctx, abs) {
log := getLogger(ctx).
WithField("last-received-hold", res.Abstraction)
if res.DestroyErr != nil {
errs = append(errs, res.DestroyErr)
log.WithError(err).
Error("cannot release last-received-hold")
} else {
log.Info("released last-received-hold")
}
}
if len(errs) == 0 {
return nil
} else {
return errorarray.Wrap(errs, "last-received-hold: release")
}
}
func ReplicationCursorV2Extractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction) {
if v.Type != zfs.Bookmark {
panic("impl error")
}
fullname := v.ToAbsPath(fs)
guid, jobid, err := ParseReplicationCursorBookmarkName(fullname)
if err == nil {
if guid != v.Guid {
// TODO log this possibly tinkered-with bookmark
return nil
}
return &ListHoldsAndBookmarksOutputBookmark{
Type: AbstractionReplicationCursorBookmarkV2,
FS: fs.ToString(),
FilesystemVersion: v,
JobID: jobid,
}
}
return nil
}
func ReplicationCursorV1Extractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction) {
if v.Type != zfs.Bookmark {
panic("impl error")
}
fullname := v.ToAbsPath(fs)
_, _, err := ParseReplicationCursorBookmarkName(fullname)
if err == ErrV1ReplicationCursor {
return &ReplicationCursorV1{
Type: AbstractionReplicationCursorBookmarkV1,
FS: fs.ToString(),
FilesystemVersion: v,
}
}
return nil
}
var _ HoldExtractor = LastReceivedHoldExtractor
func LastReceivedHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction {
var err error
if v.Type != zfs.Snapshot {
panic("impl error")
}
jobID, err := ParseLastReceivedHoldTag(holdTag)
if err == nil {
return &ListHoldsAndBookmarksOutputHold{
Type: AbstractionLastReceivedHold,
FS: fs.ToString(),
FilesystemVersion: v,
Tag: holdTag,
JobID: jobID,
}
}
return nil
}
type ReplicationCursorV1 struct {
Type AbstractionType
FS string
zfs.FilesystemVersion
}
func (c ReplicationCursorV1) GetType() AbstractionType { return c.Type }
func (c ReplicationCursorV1) GetFS() string { return c.FS }
func (c ReplicationCursorV1) GetFullPath() string { return fmt.Sprintf("%s#%s", c.FS, c.GetName()) }
func (c ReplicationCursorV1) GetJobID() *JobID { return nil }
func (c ReplicationCursorV1) GetFilesystemVersion() zfs.FilesystemVersion { return c.FilesystemVersion }
func (c ReplicationCursorV1) MarshalJSON() ([]byte, error) {
return json.Marshal(AbstractionJSON{c})
}
func (c ReplicationCursorV1) String() string {
return fmt.Sprintf("%s %s", c.Type, c.GetFullPath())
}
func (c ReplicationCursorV1) Destroy(ctx context.Context) error {
if err := zfs.ZFSDestroyIdempotent(c.GetFullPath()); err != nil {
return errors.Wrapf(err, "destroy %s %s: zfs", c.Type, c.GetFullPath())
}
return nil
}

View File

@ -0,0 +1,266 @@
package endpoint
import (
"context"
"fmt"
"regexp"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/util/errorarray"
"github.com/zrepl/zrepl/zfs"
)
var stepHoldTagRE = regexp.MustCompile("^zrepl_STEP_J_(.+)")
func StepHoldTag(jobid JobID) (string, error) {
return stepHoldTagImpl(jobid.String())
}
func stepHoldTagImpl(jobid string) (string, error) {
t := fmt.Sprintf("zrepl_STEP_J_%s", jobid)
if err := zfs.ValidHoldTag(t); err != nil {
return "", err
}
return t, nil
}
// err != nil always means that the bookmark is not a step bookmark
func ParseStepHoldTag(tag string) (JobID, error) {
match := stepHoldTagRE.FindStringSubmatch(tag)
if match == nil {
return JobID{}, fmt.Errorf("parse hold tag: match regex %q", stepHoldTagRE)
}
jobID, err := MakeJobID(match[1])
if err != nil {
return JobID{}, errors.Wrap(err, "parse hold tag: invalid job id field")
}
return jobID, nil
}
const stepBookmarkNamePrefix = "zrepl_STEP"
// v must be validated by caller
func StepBookmarkName(fs string, guid uint64, id JobID) (string, error) {
return stepBookmarkNameImpl(fs, guid, id.String())
}
func stepBookmarkNameImpl(fs string, guid uint64, jobid string) (string, error) {
return makeJobAndGuidBookmarkName(stepBookmarkNamePrefix, fs, guid, jobid)
}
// name is the full bookmark name, including dataset path
//
// err != nil always means that the bookmark is not a step bookmark
func ParseStepBookmarkName(fullname string) (guid uint64, jobID JobID, err error) {
guid, jobID, err = parseJobAndGuidBookmarkName(fullname, stepBookmarkNamePrefix)
if err != nil {
err = errors.Wrap(err, "parse step bookmark name") // no shadow!
}
return guid, jobID, err
}
// idempotently hold / step-bookmark `version`
//
// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS
func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) error {
if v.IsSnapshot() {
tag, err := StepHoldTag(jobID)
if err != nil {
return errors.Wrap(err, "step hold tag")
}
if err := zfs.ZFSHold(ctx, fs, v, tag); err != nil {
return errors.Wrap(err, "step hold: zfs")
}
return nil
}
if !v.IsBookmark() {
panic(fmt.Sprintf("version must bei either snapshot or bookmark, got %#v", v))
}
bmname, err := StepBookmarkName(fs, v.Guid, jobID)
if err != nil {
return errors.Wrap(err, "create step bookmark: determine bookmark name")
}
// idempotently create bookmark
err = zfs.ZFSBookmark(fs, v.ToSendArgVersion(), bmname)
if err != nil {
if err == zfs.ErrBookmarkCloningNotSupported {
// TODO we could actually try to find a local snapshot that has the requested GUID
// however, the replication algorithm prefers snapshots anyways, so this quest
// is most likely not going to be successful. Also, there's the possibility that
// the caller might want to filter what snapshots are eligibile, and this would
// complicate things even further.
return err // TODO go1.13 use wrapping
}
return errors.Wrap(err, "create step bookmark: zfs")
}
return nil
}
// idempotently release the step-hold on v if v is a snapshot
// or idempotently destroy the step-bookmark of v if v is a bookmark
//
// note that this operation leaves v itself untouched, unless v is the step-bookmark itself, in which case v is destroyed
//
// returns an instance of *zfs.DatasetDoesNotExist if `v` does not exist
func ReleaseStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) error {
if v.IsSnapshot() {
tag, err := StepHoldTag(jobID)
if err != nil {
return errors.Wrap(err, "step release tag")
}
if err := zfs.ZFSRelease(ctx, tag, v.FullPath(fs)); err != nil {
return errors.Wrap(err, "step release: zfs")
}
return nil
}
if !v.IsBookmark() {
panic(fmt.Sprintf("impl error: expecting version to be a bookmark, got %#v", v))
}
bmname, err := StepBookmarkName(fs, v.Guid, jobID)
if err != nil {
return errors.Wrap(err, "step release: determine bookmark name")
}
// idempotently destroy bookmark
if err := zfs.ZFSDestroyIdempotent(bmname); err != nil {
return errors.Wrap(err, "step release: bookmark destroy: zfs")
}
return nil
}
// release {step holds, step bookmarks} earlier and including `mostRecent`
func ReleaseStepCummulativeInclusive(ctx context.Context, fs string, mostRecent zfs.FilesystemVersion, jobID JobID) error {
q := ListZFSHoldsAndBookmarksQuery{
What: AbstractionTypeSet{
AbstractionStepHold: true,
AbstractionStepBookmark: true,
},
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &fs,
},
JobID: &jobID,
Until: &InclusiveExclusiveCreateTXG{
CreateTXG: mostRecent.CreateTXG,
Inclusive: &zfs.NilBool{B: true},
},
}
abs, absErrs, err := ListAbstractions(ctx, q)
if err != nil {
return errors.Wrap(err, "step release cummulative: list")
}
if len(absErrs) > 0 {
return errors.Wrap(ListAbstractionsErrors(absErrs), "step release cummulative: list")
}
getLogger(ctx).WithField("step_holds_and_bookmarks", fmt.Sprintf("%s", abs)).Debug("releasing step holds and bookmarks")
var errs []error
for res := range BatchDestroy(ctx, abs) {
log := getLogger(ctx).
WithField("step_hold_or_bookmark", res.Abstraction)
if res.DestroyErr != nil {
errs = append(errs, res.DestroyErr)
log.WithError(err).
Error("cannot release step hold or bookmark")
} else {
log.Info("released step hold or bookmark")
}
}
if len(errs) == 0 {
return nil
} else {
return errorarray.Wrap(errs, "step release cummulative: release")
}
}
func TryReleaseStepStaleFS(ctx context.Context, fs string, jobID JobID) {
q := ListZFSHoldsAndBookmarksQuery{
FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{
FS: &fs,
},
JobID: &jobID,
What: AbstractionTypeSet{
AbstractionStepHold: true,
AbstractionStepBookmark: true,
},
}
staleness, err := ListStale(ctx, q)
if err != nil {
getLogger(ctx).WithError(err).Error("cannot list stale step holds and bookmarks")
return
}
for _, s := range staleness.Stale {
getLogger(ctx).WithField("stale_step_hold_or_bookmark", s).Info("batch-destroying stale step hold or bookmark")
}
for res := range BatchDestroy(ctx, staleness.Stale) {
if res.DestroyErr != nil {
getLogger(ctx).
WithField("stale_step_hold_or_bookmark", res.Abstraction).
WithError(res.DestroyErr).
Error("cannot destroy stale step-hold or bookmark")
} else {
getLogger(ctx).
WithField("stale_step_hold_or_bookmark", res.Abstraction).
WithError(res.DestroyErr).
Info("destroyed stale step-hold or bookmark")
}
}
}
var _ BookmarkExtractor = StepBookmarkExtractor
func StepBookmarkExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction) {
if v.Type != zfs.Bookmark {
panic("impl error")
}
fullname := v.ToAbsPath(fs)
guid, jobid, err := ParseStepBookmarkName(fullname)
if guid != v.Guid {
// TODO log this possibly tinkered-with bookmark
return nil
}
if err == nil {
bm := &ListHoldsAndBookmarksOutputBookmark{
Type: AbstractionStepBookmark,
FS: fs.ToString(),
FilesystemVersion: v,
JobID: jobid,
}
return bm
}
return nil
}
var _ HoldExtractor = StepHoldExtractor
func StepHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction {
if v.Type != zfs.Snapshot {
panic("impl error")
}
jobID, err := ParseStepHoldTag(holdTag)
if err == nil {
return &ListHoldsAndBookmarksOutputHold{
Type: AbstractionStepHold,
FS: fs.ToString(),
Tag: holdTag,
FilesystemVersion: v,
JobID: jobID,
}
}
return nil
}

View File

@ -1,7 +1,6 @@
package endpoint
import (
"context"
"fmt"
"regexp"
"strconv"
@ -57,53 +56,3 @@ func parseJobAndGuidBookmarkName(fullname string, prefix string) (guid uint64, j
return guid, jobID, nil
}
func destroyBookmarksOlderThan(ctx context.Context, fs string, mostRecent *zfs.ZFSSendArgVersion, jobID JobID, filter func(shortname string) (accept bool)) (destroyed []zfs.FilesystemVersion, err error) {
if filter == nil {
panic(filter)
}
fsp, err := zfs.NewDatasetPath(fs)
if err != nil {
return nil, errors.Wrap(err, "invalid filesystem path")
}
mostRecentProps, err := mostRecent.ValidateExistsAndGetCheckedProps(ctx, fs)
if err != nil {
return nil, errors.Wrap(err, "validate most recent version argument")
}
stepBookmarks, err := zfs.ZFSListFilesystemVersions(fsp, zfs.FilterFromClosure(
func(t zfs.VersionType, name string) (accept bool, err error) {
if t != zfs.Bookmark {
return false, nil
}
return filter(name), nil
}))
if err != nil {
return nil, errors.Wrap(err, "list bookmarks")
}
// cut off all bookmarks prior to mostRecent's CreateTXG
var destroy []zfs.FilesystemVersion
for _, v := range stepBookmarks {
if v.Type != zfs.Bookmark {
panic("implementation error")
}
if !filter(v.Name) {
panic("inconsistent filter result")
}
if v.CreateTXG < mostRecentProps.CreateTXG {
destroy = append(destroy, v)
}
}
// FIXME use batch destroy, must adopt code to handle bookmarks
for _, v := range destroy {
if err := zfs.ZFSDestroyIdempotent(v.ToAbsPath(fsp)); err != nil {
return nil, errors.Wrap(err, "destroy bookmark")
}
}
return destroy, nil
}

View File

@ -0,0 +1,73 @@
package endpoint
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/zfs"
)
type ListHoldsAndBookmarksOutputBookmark struct {
Type AbstractionType
FS string
zfs.FilesystemVersion
JobID JobID
}
func (b ListHoldsAndBookmarksOutputBookmark) GetType() AbstractionType { return b.Type }
func (b ListHoldsAndBookmarksOutputBookmark) GetFS() string { return b.FS }
func (b ListHoldsAndBookmarksOutputBookmark) GetJobID() *JobID { return &b.JobID }
func (b ListHoldsAndBookmarksOutputBookmark) GetFullPath() string {
return fmt.Sprintf("%s#%s", b.FS, b.Name) // TODO use zfs.FilesystemVersion.ToAbsPath
}
func (b ListHoldsAndBookmarksOutputBookmark) MarshalJSON() ([]byte, error) {
return json.Marshal(AbstractionJSON{b})
}
func (b ListHoldsAndBookmarksOutputBookmark) String() string {
return fmt.Sprintf("%s %s", b.Type, b.GetFullPath())
}
func (b ListHoldsAndBookmarksOutputBookmark) GetFilesystemVersion() zfs.FilesystemVersion {
return b.FilesystemVersion
}
func (b ListHoldsAndBookmarksOutputBookmark) Destroy(ctx context.Context) error {
if err := zfs.ZFSDestroyIdempotent(b.GetFullPath()); err != nil {
return errors.Wrapf(err, "destroy %s: zfs", b)
}
return nil
}
type ListHoldsAndBookmarksOutputHold struct {
Type AbstractionType
FS string
zfs.FilesystemVersion
Tag string
JobID JobID
}
func (h ListHoldsAndBookmarksOutputHold) GetType() AbstractionType { return h.Type }
func (h ListHoldsAndBookmarksOutputHold) GetFS() string { return h.FS }
func (h ListHoldsAndBookmarksOutputHold) GetJobID() *JobID { return &h.JobID }
func (h ListHoldsAndBookmarksOutputHold) GetFullPath() string {
return fmt.Sprintf("%s@%s", h.FS, h.GetName()) // TODO use zfs.FilesystemVersion.ToAbsPath
}
func (h ListHoldsAndBookmarksOutputHold) MarshalJSON() ([]byte, error) {
return json.Marshal(AbstractionJSON{h})
}
func (h ListHoldsAndBookmarksOutputHold) String() string {
return fmt.Sprintf("%s %q on %s", h.Type, h.Tag, h.GetFullPath())
}
func (h ListHoldsAndBookmarksOutputHold) GetFilesystemVersion() zfs.FilesystemVersion {
return h.FilesystemVersion
}
func (h ListHoldsAndBookmarksOutputHold) Destroy(ctx context.Context) error {
if err := zfs.ZFSRelease(ctx, h.Tag, h.GetFullPath()); err != nil {
return errors.Wrapf(err, "release %s: zfs", h)
}
return nil
}

1
go.mod
View File

@ -33,5 +33,6 @@ require (
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037
gonum.org/v1/gonum v0.7.0 // indirect
google.golang.org/grpc v1.17.0
)

18
go.sum
View File

@ -5,6 +5,7 @@ github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q
github.com/OpenPeeDeeP/depguard v0.0.0-20180806142446-a69c782687b2/go.mod h1:7/4sitnI9YlQgTLLk734QlzXT8DuHVnAyztLplQjk+o=
github.com/OpenPeeDeeP/depguard v0.0.0-20181229194401-1f388ab2d810/go.mod h1:7/4sitnI9YlQgTLLk734QlzXT8DuHVnAyztLplQjk+o=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@ -28,6 +29,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/fatih/color v1.6.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ftrvxmtrx/fd v0.0.0-20150925145434-c6d800382fff h1:zk1wwii7uXmI0znwU+lqg+wFL9G5+vm5I+9rv2let60=
@ -70,6 +72,7 @@ github.com/go-toolsmith/typep v1.0.0/go.mod h1:JSQCQMUPdRlMZFswiq3TGpNp1GMktqkR2
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
@ -122,6 +125,7 @@ github.com/jinzhu/copier v0.0.0-20170922082739-db4671f3a9b8/go.mod h1:yL958EeXv8
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
@ -289,6 +293,11 @@ golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2 h1:y102fOLFqhV41b+4GPiJoa0k/x+pJcEi2/HB1Y5T6fU=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20170915142106-8351a756f30f/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -328,17 +337,24 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20170915040203-e531a2a1c15f/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181117154741-2ddaf7f79a09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181205014116-22934f0fdb62/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190110163146-51295c7ec13a/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190121143147-24cd39ecf745/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190213192042-740235f6c0d8/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190311215038-5c2858a9cfe5/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190322203728-c1a832b0ad89/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190521203540-521d6ed310dd/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190524210228-3d17549cdc6b/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.7.0 h1:Hdks0L0hgznZLG9nzXb8vZ0rRvqNvAcgAp84y7Mwkgw=
gonum.org/v1/gonum v0.7.0/go.mod h1:L02bwd0sqlsvRv41G7wGWFCsVNZFv/k1xzGIxeANHGM=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
@ -350,6 +366,7 @@ google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@ -368,5 +385,6 @@ mvdan.cc/interfacer v0.0.0-20180901003855-c20040233aed/go.mod h1:Xkxe497xwlCKkIa
mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b/go.mod h1:2odslEg/xrtNQqCYg2/jCoyKnw3vv5biOc3JnIcYfL4=
mvdan.cc/unparam v0.0.0-20190209190245-fbb59629db34/go.mod h1:H6SUd1XjIs+qQCyskXg5OFSrilMRUkD8ePJpHKDPaeY=
mvdan.cc/unparam v0.0.0-20190310220240-1b9ccfa71afe/go.mod h1:BnhuWBAqxH3+J5bDybdxgw5ZfS+DsVd4iylsKQePN8o=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
sourcegraph.com/sqs/pbtypes v1.0.0/go.mod h1:3AciMUv4qUuRHRHhOG4TZOB+72GdPVz5k+c648qsFS4=

View File

@ -1,154 +0,0 @@
package tests
import (
"fmt"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/platformtest"
"github.com/zrepl/zrepl/zfs"
)
type rollupReleaseExpectTags struct {
Snap string
Holds map[string]bool
}
func rollupReleaseTest(ctx *platformtest.Context, cb func(fs string) []rollupReleaseExpectTags) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
DESTROYROOT
CREATEROOT
+ "foo bar"
+ "foo bar@1"
+ "foo bar@2"
+ "foo bar@3"
+ "foo bar@4"
+ "foo bar@5"
+ "foo bar@6"
R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@1"
R zfs hold zrepl_platformtest_2 "${ROOTDS}/foo bar@2"
R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@3"
R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@5"
R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@6"
R zfs bookmark "${ROOTDS}/foo bar@5" "${ROOTDS}/foo bar#5"
`)
fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset)
expTags := cb(fs)
for _, exp := range expTags {
holds, err := zfs.ZFSHolds(ctx, fs, exp.Snap)
if err != nil {
panic(err)
}
for _, h := range holds {
if e, ok := exp.Holds[h]; !ok || !e {
panic(fmt.Sprintf("tag %q on snap %q not expected", h, exp.Snap))
}
}
}
}
func RollupReleaseIncluding(ctx *platformtest.Context) {
rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags {
guid5, err := zfs.ZFSGetGUID(fs, "@5")
require.NoError(ctx, err)
err = zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, guid5, "zrepl_platformtest")
require.NoError(ctx, err)
return []rollupReleaseExpectTags{
{"1", map[string]bool{}},
{"2", map[string]bool{"zrepl_platformtest_2": true}},
{"3", map[string]bool{}},
{"4", map[string]bool{}},
{"5", map[string]bool{}},
{"6", map[string]bool{"zrepl_platformtest": true}},
}
})
}
func RollupReleaseExcluding(ctx *platformtest.Context) {
rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags {
guid5, err := zfs.ZFSGetGUID(fs, "@5")
require.NoError(ctx, err)
err = zfs.ZFSReleaseAllOlderThanGUID(ctx, fs, guid5, "zrepl_platformtest")
require.NoError(ctx, err)
return []rollupReleaseExpectTags{
{"1", map[string]bool{}},
{"2", map[string]bool{"zrepl_platformtest_2": true}},
{"3", map[string]bool{}},
{"4", map[string]bool{}},
{"5", map[string]bool{"zrepl_platformtest": true}},
{"6", map[string]bool{"zrepl_platformtest": true}},
}
})
}
func RollupReleaseMostRecentIsBookmarkWithoutSnapshot(ctx *platformtest.Context) {
rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags {
guid5, err := zfs.ZFSGetGUID(fs, "#5")
require.NoError(ctx, err)
err = zfs.ZFSRelease(ctx, "zrepl_platformtest", fs+"@5")
require.NoError(ctx, err)
err = zfs.ZFSDestroy(fs + "@5")
require.NoError(ctx, err)
err = zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, guid5, "zrepl_platformtest")
require.NoError(ctx, err)
return []rollupReleaseExpectTags{
{"1", map[string]bool{}},
{"2", map[string]bool{"zrepl_platformtest_2": true}},
{"3", map[string]bool{}},
{"4", map[string]bool{}},
// {"5", map[string]bool{}}, doesn't exist
{"6", map[string]bool{"zrepl_platformtest": true}},
}
})
}
func RollupReleaseMostRecentIsBookmarkAndSnapshotStillExists(ctx *platformtest.Context) {
rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags {
guid5, err := zfs.ZFSGetGUID(fs, "#5")
require.NoError(ctx, err)
err = zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, guid5, "zrepl_platformtest")
require.NoError(ctx, err)
return []rollupReleaseExpectTags{
{"1", map[string]bool{}},
{"2", map[string]bool{"zrepl_platformtest_2": true}},
{"3", map[string]bool{}},
{"4", map[string]bool{}},
{"5", map[string]bool{}},
{"6", map[string]bool{"zrepl_platformtest": true}},
}
})
}
func RollupReleaseMostRecentDoesntExist(ctx *platformtest.Context) {
rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags {
const nonexistentGuid = 0 // let's take our chances...
err := zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, nonexistentGuid, "zrepl_platformtest")
require.Error(ctx, err)
require.Contains(ctx, err.Error(), "cannot find snapshot or bookmark with guid 0")
return []rollupReleaseExpectTags{
{"1", map[string]bool{"zrepl_platformtest": true}},
{"2", map[string]bool{"zrepl_platformtest_2": true}},
{"3", map[string]bool{"zrepl_platformtest": true}},
{"4", map[string]bool{"zrepl_platformtest": true}},
{"5", map[string]bool{"zrepl_platformtest": true}},
{"6", map[string]bool{"zrepl_platformtest": true}},
}
})
}

View File

@ -25,6 +25,14 @@ func sendArgVersion(fs, relName string) zfs.ZFSSendArgVersion {
}
}
func fsversion(fs, relname string) zfs.FilesystemVersion {
v, err := zfs.ZFSGetFilesystemVersion(fs + relname)
if err != nil {
panic(err)
}
return v
}
func mustDatasetPath(fs string) *zfs.DatasetPath {
p, err := zfs.NewDatasetPath(fs)
if err != nil {
@ -47,8 +55,8 @@ func mustSnapshot(snap string) {
}
}
func mustGetProps(entity string) zfs.ZFSPropCreateTxgAndGuidProps {
props, err := zfs.ZFSGetCreateTXGAndGuid(entity)
func mustGetFilesystemVersion(snapOrBookmark string) zfs.FilesystemVersion {
props, err := zfs.ZFSGetFilesystemVersion(snapOrBookmark)
check(err)
return props
}
@ -78,7 +86,7 @@ type dummySnapshotSituation struct {
}
type resumeSituation struct {
sendArgs zfs.ZFSSendArgs
sendArgs zfs.ZFSSendArgsUnvalidated
recvOpts zfs.RecvOptions
sendErr, recvErr error
recvErrDecoded *zfs.RecvFailedWithResumeTokenErr
@ -107,7 +115,7 @@ func makeDummyDataSnapshots(ctx *platformtest.Context, sendFS string) (situation
return situation
}
func makeResumeSituation(ctx *platformtest.Context, src dummySnapshotSituation, recvFS string, sendArgs zfs.ZFSSendArgs, recvOptions zfs.RecvOptions) *resumeSituation {
func makeResumeSituation(ctx *platformtest.Context, src dummySnapshotSituation, recvFS string, sendArgs zfs.ZFSSendArgsUnvalidated, recvOptions zfs.RecvOptions) *resumeSituation {
situation := &resumeSituation{}
@ -115,8 +123,13 @@ func makeResumeSituation(ctx *platformtest.Context, src dummySnapshotSituation,
situation.recvOpts = recvOptions
require.True(ctx, recvOptions.SavePartialRecvState, "this method would be pointless otherwise")
require.Equal(ctx, sendArgs.FS, src.sendFS)
sendArgsValidated, err := sendArgs.Validate(ctx)
situation.sendErr = err
if err != nil {
return situation
}
copier, err := zfs.ZFSSend(ctx, sendArgs)
copier, err := zfs.ZFSSend(ctx, sendArgsValidated)
situation.sendErr = err
if err != nil {
return situation

View File

@ -22,7 +22,7 @@ func IdempotentHold(ctx *platformtest.Context) {
`)
fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset)
v1 := sendArgVersion(fs, "@1")
v1 := fsversion(fs, "@1")
tag := "zrepl_platformtest"
err := zfs.ZFSHold(ctx, fs, v1, tag)
@ -34,14 +34,4 @@ func IdempotentHold(ctx *platformtest.Context) {
if err != nil {
panic(err)
}
vnonexistent := zfs.ZFSSendArgVersion{
RelName: "@nonexistent",
GUID: 0xbadf00d,
}
err = zfs.ZFSHold(ctx, fs, vnonexistent, tag)
if err == nil {
panic("still expecting error for nonexistent snapshot")
}
}

View File

@ -31,7 +31,7 @@ func ReplicationCursor(ctx *platformtest.Context) {
}
fs := ds.ToString()
snap := sendArgVersion(fs, "@1 with space")
snap := fsversion(fs, "@1 with space")
destroyed, err := endpoint.MoveReplicationCursor(ctx, fs, &snap, jobid)
if err != nil {
@ -39,7 +39,7 @@ func ReplicationCursor(ctx *platformtest.Context) {
}
assert.Empty(ctx, destroyed)
snapProps, err := zfs.ZFSGetCreateTXGAndGuid(snap.FullPath(fs))
snapProps, err := zfs.ZFSGetFilesystemVersion(snap.FullPath(fs))
if err != nil {
panic(err)
}
@ -56,13 +56,13 @@ func ReplicationCursor(ctx *platformtest.Context) {
}
// try moving
cursor1BookmarkName, err := endpoint.ReplicationCursorBookmarkName(fs, snap.GUID, jobid)
cursor1BookmarkName, err := endpoint.ReplicationCursorBookmarkName(fs, snap.Guid, jobid)
require.NoError(ctx, err)
snap2 := sendArgVersion(fs, "@2 with space")
snap2 := fsversion(fs, "@2 with space")
destroyed, err = endpoint.MoveReplicationCursor(ctx, fs, &snap2, jobid)
require.NoError(ctx, err)
require.Equal(ctx, 1, len(destroyed))
require.Equal(ctx, zfs.Bookmark, destroyed[0].Type)
require.Equal(ctx, cursor1BookmarkName, destroyed[0].Name)
require.Equal(ctx, endpoint.AbstractionReplicationCursorBookmarkV2, destroyed[0].GetType())
require.Equal(ctx, cursor1BookmarkName, destroyed[0].GetName())
}

View File

@ -25,7 +25,7 @@ func ResumableRecvAndTokenHandling(ctx *platformtest.Context) {
src := makeDummyDataSnapshots(ctx, sendFS)
s := makeResumeSituation(ctx, src, recvFS, zfs.ZFSSendArgs{
s := makeResumeSituation(ctx, src, recvFS, zfs.ZFSSendArgsUnvalidated{
FS: sendFS,
To: src.snapA,
Encrypted: &zfs.NilBool{B: false},

View File

@ -23,9 +23,9 @@ func SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden(ctx *platformt
`)
fs := fmt.Sprintf("%s/send er", ctx.RootDataset)
props := mustGetProps(fs + "@a snap")
props := mustGetFilesystemVersion(fs + "@a snap")
sendArgs := zfs.ZFSSendArgs{
sendArgs, err := zfs.ZFSSendArgsUnvalidated{
FS: fs,
To: &zfs.ZFSSendArgVersion{
RelName: "@a snap",
@ -33,10 +33,15 @@ func SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden(ctx *platformt
},
Encrypted: &zfs.NilBool{B: true},
ResumeToken: "",
}
stream, err := zfs.ZFSSend(ctx, sendArgs)
}.Validate(ctx)
var stream *zfs.ReadCloserCopier
if err == nil {
defer stream.Close()
stream, err = zfs.ZFSSend(ctx, sendArgs) // no shadow
if err == nil {
defer stream.Close()
}
// fallthrough
}
if expectNotSupportedErr {
@ -76,7 +81,7 @@ func SendArgsValidationResumeTokenEncryptionMismatchForbidden(ctx *platformtest.
src := makeDummyDataSnapshots(ctx, sendFS)
unencS := makeResumeSituation(ctx, src, unencRecvFS, zfs.ZFSSendArgs{
unencS := makeResumeSituation(ctx, src, unencRecvFS, zfs.ZFSSendArgsUnvalidated{
FS: sendFS,
To: src.snapA,
Encrypted: &zfs.NilBool{B: false}, // !
@ -85,7 +90,7 @@ func SendArgsValidationResumeTokenEncryptionMismatchForbidden(ctx *platformtest.
SavePartialRecvState: true,
})
encS := makeResumeSituation(ctx, src, encRecvFS, zfs.ZFSSendArgs{
encS := makeResumeSituation(ctx, src, encRecvFS, zfs.ZFSSendArgsUnvalidated{
FS: sendFS,
To: src.snapA,
Encrypted: &zfs.NilBool{B: true}, // !
@ -97,16 +102,10 @@ func SendArgsValidationResumeTokenEncryptionMismatchForbidden(ctx *platformtest.
// threat model: use of a crafted resume token that requests an unencrypted send
// but send args require encrypted send
{
var maliciousSend zfs.ZFSSendArgs = encS.sendArgs
var maliciousSend zfs.ZFSSendArgsUnvalidated = encS.sendArgs
maliciousSend.ResumeToken = unencS.recvErrDecoded.ResumeTokenRaw
stream, err := zfs.ZFSSend(ctx, maliciousSend)
if err == nil {
defer stream.Close()
}
require.Nil(ctx, stream)
require.Error(ctx, err)
ctx.Logf("send err: %T %s", err, err)
_, err := maliciousSend.Validate(ctx)
validationErr, ok := err.(*zfs.ZFSSendArgsValidationError)
require.True(ctx, ok)
require.Equal(ctx, validationErr.What, zfs.ZFSSendArgsResumeTokenMismatch)
@ -120,14 +119,10 @@ func SendArgsValidationResumeTokenEncryptionMismatchForbidden(ctx *platformtest.
// threat model: use of a crafted resume token that requests an encrypted send
// but send args require unencrypted send
{
var maliciousSend zfs.ZFSSendArgs = unencS.sendArgs
var maliciousSend zfs.ZFSSendArgsUnvalidated = unencS.sendArgs
maliciousSend.ResumeToken = encS.recvErrDecoded.ResumeTokenRaw
stream, err := zfs.ZFSSend(ctx, maliciousSend)
if err == nil {
defer stream.Close()
}
require.Nil(ctx, stream)
_, err := maliciousSend.Validate(ctx)
require.Error(ctx, err)
ctx.Logf("send err: %T %s", err, err)
validationErr, ok := err.(*zfs.ZFSSendArgsValidationError)
@ -169,7 +164,7 @@ func SendArgsValidationResumeTokenDifferentFilesystemForbidden(ctx *platformtest
src1 := makeDummyDataSnapshots(ctx, sendFS1)
src2 := makeDummyDataSnapshots(ctx, sendFS2)
rs := makeResumeSituation(ctx, src1, recvFS, zfs.ZFSSendArgs{
rs := makeResumeSituation(ctx, src1, recvFS, zfs.ZFSSendArgsUnvalidated{
FS: sendFS1,
To: src1.snapA,
Encrypted: &zfs.NilBool{B: false},
@ -180,7 +175,7 @@ func SendArgsValidationResumeTokenDifferentFilesystemForbidden(ctx *platformtest
// threat model: forged resume token tries to steal a full send of snapA on fs2 by
// presenting a resume token for full send of snapA on fs1
var maliciousSend zfs.ZFSSendArgs = zfs.ZFSSendArgs{
var maliciousSend zfs.ZFSSendArgsUnvalidated = zfs.ZFSSendArgsUnvalidated{
FS: sendFS2,
To: &zfs.ZFSSendArgVersion{
RelName: src2.snapA.RelName,
@ -189,12 +184,7 @@ func SendArgsValidationResumeTokenDifferentFilesystemForbidden(ctx *platformtest
Encrypted: &zfs.NilBool{B: false},
ResumeToken: rs.recvErrDecoded.ResumeTokenRaw,
}
stream, err := zfs.ZFSSend(ctx, maliciousSend)
if err == nil {
defer stream.Close()
}
require.Nil(ctx, stream)
_, err = maliciousSend.Validate(ctx)
require.Error(ctx, err)
ctx.Logf("send err: %T %s", err, err)
validationErr, ok := err.(*zfs.ZFSSendArgsValidationError)

View File

@ -18,11 +18,6 @@ var Cases = []Case{
UndestroyableSnapshotParsing,
GetNonexistent,
ReplicationCursor,
RollupReleaseIncluding,
RollupReleaseExcluding,
RollupReleaseMostRecentIsBookmarkWithoutSnapshot,
RollupReleaseMostRecentIsBookmarkAndSnapshotStillExists,
RollupReleaseMostRecentDoesntExist,
IdempotentHold,
IdempotentBookmark,
IdempotentDestroy,

View File

@ -351,18 +351,19 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
log.WithField("token", resumeToken).Debug("decode resume token")
}
// give both sides a hint about how far the replication got
// This serves as a cumulative variant of SendCompleted and can be useful
// give both sides a hint about how far prior replication attempts got
// This serves as a cummulative variant of SendCompleted and can be useful
// for example to release stale holds from an earlier (interrupted) replication.
// TODO FIXME: enqueue this as a replication step instead of doing it here during planning
// then again, the step should run regardless of planning success
// so maybe a separate phase before PLANNING, then?
path, conflict := IncrementalPath(rfsvs, sfsvs)
var sender_mrca *pdu.FilesystemVersion // from sfsvs
var sender_mrca *pdu.FilesystemVersion
if conflict == nil && len(path) > 0 {
sender_mrca = path[0] // shadow
}
if sender_mrca != nil {
// yes, sender_mrca may be nil, indicating that we do not have an mrca
{
var wg sync.WaitGroup
doHint := func(ep Endpoint, name string) {
defer wg.Done()
@ -382,8 +383,6 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
go doHint(fs.sender, "sender")
go doHint(fs.receiver, "receiver")
wg.Wait()
} else {
log.Debug("cannot identify most recent common ancestor, skipping hint")
}
var steps []*Step

View File

@ -0,0 +1,43 @@
package errorarray
import (
"fmt"
"strings"
)
type Errors struct {
Msg string
Wrapped []error
}
var _ error = (*Errors)(nil)
func Wrap(errs []error, msg string) Errors {
if len(errs) == 0 {
panic("passing empty errs argument")
}
return Errors{Msg: msg, Wrapped: errs}
}
func (e Errors) Unwrap() error {
if len(e.Wrapped) == 1 {
return e.Wrapped[0]
}
return nil // ... limitation of the Go 1.13 errors API
}
func (e Errors) Error() string {
if len(e.Wrapped) == 1 {
return fmt.Sprintf("%s: %s", e.Msg, e.Wrapped[0])
}
var buf strings.Builder
fmt.Fprintf(&buf, "%s: multiple errors:\n", e.Msg)
for i, err := range e.Wrapped {
fmt.Fprintf(&buf, "%s", err)
if i != len(e.Wrapped)-1 {
fmt.Fprintf(&buf, "\n")
}
}
return buf.String()
}

View File

@ -7,8 +7,6 @@ import (
"fmt"
"os"
"os/exec"
"sort"
"strconv"
"strings"
"syscall"
@ -36,12 +34,9 @@ func ValidHoldTag(tag string) error {
}
// Idemptotent: does not return an error if the tag already exists
func ZFSHold(ctx context.Context, fs string, v ZFSSendArgVersion, tag string) error {
if err := v.ValidateInMemory(fs); err != nil {
return errors.Wrap(err, "invalid version")
}
func ZFSHold(ctx context.Context, fs string, v FilesystemVersion, tag string) error {
if !v.IsSnapshot() {
return errors.Errorf("can only hold snapshots, got %s", v.RelName)
return errors.Errorf("can only hold snapshots, got %s", v.RelName())
}
if err := validateNotEmpty("tag", tag); err != nil {
@ -131,177 +126,7 @@ func ZFSRelease(ctx context.Context, tag string, snaps ...string) error {
debug("zfs release: no such tag lines=%v otherLines=%v", noSuchTagLines, otherLines)
}
if len(otherLines) > 0 {
return fmt.Errorf("unknown zfs error while releasing hold with tag %q: unidentified stderr lines\n%s", tag, strings.Join(otherLines, "\n"))
return fmt.Errorf("unknown zfs error while releasing hold with tag %q:\n%s", tag, strings.Join(otherLines, "\n"))
}
return nil
}
// Idempotent: if the hold doesn't exist, this is not an error
func ZFSReleaseAllOlderAndIncludingGUID(ctx context.Context, fs string, snapOrBookmarkGuid uint64, tag string) error {
return doZFSReleaseAllOlderAndIncOrExcludingGUID(ctx, fs, snapOrBookmarkGuid, tag, true)
}
// Idempotent: if the hold doesn't exist, this is not an error
func ZFSReleaseAllOlderThanGUID(ctx context.Context, fs string, snapOrBookmarkGuid uint64, tag string) error {
return doZFSReleaseAllOlderAndIncOrExcludingGUID(ctx, fs, snapOrBookmarkGuid, tag, false)
}
type zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine struct {
entityType EntityType
name string
createtxg uint64
guid uint64
userrefs uint64 // always 0 for bookmarks
}
func doZFSReleaseAllOlderAndIncOrExcludingGUID(ctx context.Context, fs string, snapOrBookmarkGuid uint64, tag string, includeGuid bool) error {
// TODO channel program support still unreleased but
// might be a huge performance improvement
// https://github.com/zfsonlinux/zfs/pull/7902/files
if err := validateZFSFilesystem(fs); err != nil {
return errors.Wrap(err, "`fs` is not a valid filesystem path")
}
if tag == "" {
return fmt.Errorf("`tag` must not be empty`")
}
output, err := exec.CommandContext(ctx,
"zfs", "list", "-o", "type,name,createtxg,guid,userrefs",
"-H", "-t", "snapshot,bookmark", "-r", "-d", "1", fs).CombinedOutput()
if err != nil {
return &ZFSError{output, errors.Wrap(err, "cannot list snapshots and their userrefs")}
}
lines, err := doZFSReleaseAllOlderAndIncOrExcludingGUIDParseListOutput(output)
if err != nil {
return errors.Wrap(err, "unexpected ZFS output")
}
releaseSnaps, err := doZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(snapOrBookmarkGuid, includeGuid, lines)
if err != nil {
return err
}
if len(releaseSnaps) == 0 {
return nil
}
return ZFSRelease(ctx, tag, releaseSnaps...)
}
func doZFSReleaseAllOlderAndIncOrExcludingGUIDParseListOutput(output []byte) ([]zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine, error) {
scan := bufio.NewScanner(bytes.NewReader(output))
var lines []zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine
for scan.Scan() {
const numCols = 5
comps := strings.SplitN(scan.Text(), "\t", numCols)
if len(comps) != numCols {
return nil, fmt.Errorf("not %d columns\n%s", numCols, output)
}
dstype := comps[0]
name := comps[1]
var entityType EntityType
switch dstype {
case "snapshot":
entityType = EntityTypeSnapshot
case "bookmark":
entityType = EntityTypeBookmark
default:
return nil, fmt.Errorf("column 0 is %q, expecting \"snapshot\" or \"bookmark\"", dstype)
}
createtxg, err := strconv.ParseUint(comps[2], 10, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse createtxg %q: %s\n%s", comps[2], err, output)
}
guid, err := strconv.ParseUint(comps[3], 10, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse guid %q: %s\n%s", comps[3], err, output)
}
var userrefs uint64
switch entityType {
case EntityTypeBookmark:
if comps[4] != "-" {
return nil, fmt.Errorf("entity type \"bookmark\" should have userrefs=\"-\", got %q", comps[4])
}
userrefs = 0
case EntityTypeSnapshot:
userrefs, err = strconv.ParseUint(comps[4], 10, 64) // shadow
if err != nil {
return nil, fmt.Errorf("cannot parse userrefs %q: %s\n%s", comps[4], err, output)
}
default:
panic(entityType)
}
lines = append(lines, zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine{
entityType: entityType,
name: name,
createtxg: createtxg,
guid: guid,
userrefs: userrefs,
})
}
return lines, nil
}
func doZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(snapOrBookmarkGuid uint64, includeGuid bool, lines []zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine) (releaseSnaps []string, err error) {
// sort lines by createtxg,(snap < bookmark)
// we cannot do this using zfs list -s because `type` is not a
sort.Slice(lines, func(i, j int) (less bool) {
if lines[i].createtxg == lines[j].createtxg {
iET := func(t EntityType) int {
switch t {
case EntityTypeSnapshot:
return 0
case EntityTypeBookmark:
return 1
default:
panic("unexpected entity type " + t.String())
}
}
return iET(lines[i].entityType) < iET(lines[j].entityType)
}
return lines[i].createtxg < lines[j].createtxg
})
// iterate over snapshots oldest to newest and collect snapshots that have holds and
// are older than (inclusive or exclusive, depends on includeGuid) a snapshot or bookmark
// with snapOrBookmarkGuid
foundGuid := false
for _, line := range lines {
if line.guid == snapOrBookmarkGuid {
foundGuid = true
}
if line.userrefs > 0 {
if !foundGuid || (foundGuid && includeGuid) {
// only snapshots have userrefs > 0, no need to check entityType
releaseSnaps = append(releaseSnaps, line.name)
}
}
if foundGuid {
// The secondary key in sorting (snap < bookmark) guarantees that we
// A) either found the snapshot with snapOrBookmarkGuid
// B) or no snapshot with snapGuid exists, but one or more bookmarks of it exists
// In the case of A, we already added the snapshot to releaseSnaps if includeGuid requests it,
// and can ignore possible subsequent bookmarks of the snapshot.
// In the case of B, there is nothing to add to releaseSnaps.
break
}
}
if !foundGuid {
return nil, fmt.Errorf("cannot find snapshot or bookmark with guid %v", snapOrBookmarkGuid)
}
return releaseSnaps, nil
}

View File

@ -1,38 +0,0 @@
package zfs
import (
"testing"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDoZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(t *testing.T) {
// what we test here: sort bookmark #3 before @3
// => assert that the function doesn't stop at the first guid match
// (which might be a bookmark, depending on zfs list ordering)
// but instead considers the entire stride of bookmarks and snapshots with that guid
//
// also, throw in unordered createtxg for good measure
list, err := doZFSReleaseAllOlderAndIncOrExcludingGUIDParseListOutput(
[]byte("snapshot\tfoo@1\t1\t1013001\t1\n" +
"snapshot\tfoo@2\t2\t2013002\t1\n" +
"bookmark\tfoo#3\t3\t7013003\t-\n" +
"snapshot\tfoo@6\t6\t5013006\t1\n" +
"snapshot\tfoo@3\t3\t7013003\t1\n" +
"snapshot\tfoo@4\t3\t6013004\t1\n" +
""),
)
require.NoError(t, err)
t.Log(pretty.Sprint(list))
require.Equal(t, 6, len(list))
require.Equal(t, EntityTypeBookmark, list[2].entityType)
releaseSnaps, err := doZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(7013003, true, list)
t.Logf("releasedSnaps = %#v", releaseSnaps)
assert.NoError(t, err)
assert.Equal(t, []string{"foo@1", "foo@2", "foo@3"}, releaseSnaps)
}

View File

@ -55,6 +55,7 @@ func DecomposeVersionString(v string) (fs string, versionType VersionType, name
}
}
// The data in a FilesystemVersion is guaranteed to stem from a ZFS CLI invocation.
type FilesystemVersion struct {
Type VersionType
@ -72,9 +73,16 @@ type FilesystemVersion struct {
Creation time.Time
}
func (v FilesystemVersion) String() string {
func (v FilesystemVersion) GetCreateTXG() uint64 { return v.CreateTXG }
func (v FilesystemVersion) GetGUID() uint64 { return v.Guid }
func (v FilesystemVersion) GetGuid() uint64 { return v.Guid }
func (v FilesystemVersion) GetName() string { return v.Name }
func (v FilesystemVersion) IsSnapshot() bool { return v.Type == Snapshot }
func (v FilesystemVersion) IsBookmark() bool { return v.Type == Bookmark }
func (v FilesystemVersion) RelName() string {
return fmt.Sprintf("%s%s", v.Type.DelimiterChar(), v.Name)
}
func (v FilesystemVersion) String() string { return v.RelName() }
func (v FilesystemVersion) ToAbsPath(p *DatasetPath) string {
var b bytes.Buffer
@ -84,6 +92,49 @@ func (v FilesystemVersion) ToAbsPath(p *DatasetPath) string {
return b.String()
}
func (v FilesystemVersion) FullPath(fs string) string {
return fmt.Sprintf("%s%s", fs, v.RelName())
}
func (v FilesystemVersion) ToSendArgVersion() ZFSSendArgVersion {
return ZFSSendArgVersion{
RelName: v.RelName(),
GUID: v.Guid,
}
}
type ParseFilesystemVersionArgs struct {
fullname string
guid, createtxg, creation string
}
func ParseFilesystemVersion(args ParseFilesystemVersionArgs) (v FilesystemVersion, err error) {
_, v.Type, v.Name, err = DecomposeVersionString(args.fullname)
if err != nil {
return v, err
}
if v.Guid, err = strconv.ParseUint(args.guid, 10, 64); err != nil {
err = errors.Wrapf(err, "cannot parse GUID %q", args.guid)
return v, err
}
if v.CreateTXG, err = strconv.ParseUint(args.createtxg, 10, 64); err != nil {
err = errors.Wrapf(err, "cannot parse CreateTXG %q", args.createtxg)
return v, err
}
creationUnix, err := strconv.ParseInt(args.creation, 10, 64)
if err != nil {
err = errors.Wrapf(err, "cannot parse creation date %q", args.creation)
return v, err
} else {
v.Creation = time.Unix(creationUnix, 0)
}
return v, nil
}
type FilesystemVersionFilter interface {
Filter(t VersionType, name string) (accept bool, err error)
}
@ -126,32 +177,17 @@ func ZFSListFilesystemVersions(fs *DatasetPath, filter FilesystemVersionFilter)
}
line := listResult.Fields
var v FilesystemVersion
_, v.Type, v.Name, err = DecomposeVersionString(line[0])
args := ParseFilesystemVersionArgs{
fullname: line[0],
guid: line[1],
createtxg: line[2],
creation: line[3],
}
v, err := ParseFilesystemVersion(args)
if err != nil {
return nil, err
}
if v.Guid, err = strconv.ParseUint(line[1], 10, 64); err != nil {
err = errors.Wrap(err, "cannot parse GUID")
return
}
if v.CreateTXG, err = strconv.ParseUint(line[2], 10, 64); err != nil {
err = errors.Wrap(err, "cannot parse CreateTXG")
return
}
creationUnix, err := strconv.ParseInt(line[3], 10, 64)
if err != nil {
err = fmt.Errorf("cannot parse creation date '%s': %s", line[3], err)
return nil, err
} else {
v.Creation = time.Unix(creationUnix, 0)
}
accept := true
if filter != nil {
accept, err = filter.Filter(v.Type, v.Name)
@ -167,3 +203,16 @@ func ZFSListFilesystemVersions(fs *DatasetPath, filter FilesystemVersionFilter)
}
return
}
func ZFSGetFilesystemVersion(ds string) (v FilesystemVersion, _ error) {
props, err := zfsGet(ds, []string{"createtxg", "guid", "creation"}, sourceAny)
if err != nil {
return v, err
}
return ParseFilesystemVersion(ParseFilesystemVersionArgs{
fullname: ds,
createtxg: props.Get("createtxg"),
guid: props.Get("guid"),
creation: props.Get("creation"),
})
}

View File

@ -314,7 +314,7 @@ func absVersion(fs string, v *ZFSSendArgVersion) (full string, err error) {
// a must already be validated
//
// SECURITY SENSITIVE because Raw must be handled correctly
func (a ZFSSendArgs) buildCommonSendArgs() ([]string, error) {
func (a ZFSSendArgsUnvalidated) buildCommonSendArgs() ([]string, error) {
args := make([]string, 0, 3)
// ResumeToken takes precedence, we assume that it has been validated to reflect
@ -525,6 +525,9 @@ type ZFSSendArgVersion struct {
GUID uint64
}
func (v ZFSSendArgVersion) GetGuid() uint64 { return v.GUID }
func (v ZFSSendArgVersion) ToSendArgVersion() ZFSSendArgVersion { return v }
func (v ZFSSendArgVersion) ValidateInMemory(fs string) error {
if fs == "" {
panic(fs)
@ -558,26 +561,26 @@ func (v ZFSSendArgVersion) mustValidateInMemory(fs string) {
}
// fs must be not empty
func (a ZFSSendArgVersion) ValidateExistsAndGetCheckedProps(ctx context.Context, fs string) (ZFSPropCreateTxgAndGuidProps, error) {
func (a ZFSSendArgVersion) ValidateExistsAndGetVersion(ctx context.Context, fs string) (v FilesystemVersion, _ error) {
if err := a.ValidateInMemory(fs); err != nil {
return ZFSPropCreateTxgAndGuidProps{}, nil
return v, nil
}
realProps, err := ZFSGetCreateTXGAndGuid(a.FullPath(fs))
realVersion, err := ZFSGetFilesystemVersion(a.FullPath(fs))
if err != nil {
return ZFSPropCreateTxgAndGuidProps{}, err
return v, err
}
if realProps.Guid != a.GUID {
return ZFSPropCreateTxgAndGuidProps{}, fmt.Errorf("`GUID` field does not match real dataset's GUID: %q != %q", realProps.Guid, a.GUID)
if realVersion.Guid != a.GUID {
return v, fmt.Errorf("`GUID` field does not match real dataset's GUID: %q != %q", realVersion.Guid, a.GUID)
}
return realProps, nil
return realVersion, nil
}
func (a ZFSSendArgVersion) ValidateExists(ctx context.Context, fs string) error {
_, err := a.ValidateExistsAndGetCheckedProps(ctx, fs)
_, err := a.ValidateExistsAndGetVersion(ctx, fs)
return err
}
@ -619,7 +622,7 @@ func (n *NilBool) String() string {
}
// When updating this struct, check Validate and ValidateCorrespondsToResumeToken (POTENTIALLY SECURITY SENSITIVE)
type ZFSSendArgs struct {
type ZFSSendArgsUnvalidated struct {
FS string
From, To *ZFSSendArgVersion // From may be nil
Encrypted *NilBool
@ -628,6 +631,12 @@ type ZFSSendArgs struct {
ResumeToken string // if not nil, must match what is specified in From, To (covered by ValidateCorrespondsToResumeToken)
}
type ZFSSendArgsValidated struct {
ZFSSendArgsUnvalidated
FromVersion *FilesystemVersion
ToVersion FilesystemVersion
}
type zfsSendArgsValidationContext struct {
encEnabled *NilBool
}
@ -642,16 +651,16 @@ const (
)
type ZFSSendArgsValidationError struct {
Args ZFSSendArgs
Args ZFSSendArgsUnvalidated
What ZFSSendArgsValidationErrorCode
Msg error
}
func newValidationError(sendArgs ZFSSendArgs, what ZFSSendArgsValidationErrorCode, cause error) *ZFSSendArgsValidationError {
func newValidationError(sendArgs ZFSSendArgsUnvalidated, what ZFSSendArgsValidationErrorCode, cause error) *ZFSSendArgsValidationError {
return &ZFSSendArgsValidationError{sendArgs, what, cause}
}
func newGenericValidationError(sendArgs ZFSSendArgs, cause error) *ZFSSendArgsValidationError {
func newGenericValidationError(sendArgs ZFSSendArgsUnvalidated, cause error) *ZFSSendArgsValidationError {
return &ZFSSendArgsValidationError{sendArgs, ZFSSendArgsGenericValidationError, cause}
}
@ -663,49 +672,57 @@ func (e ZFSSendArgsValidationError) Error() string {
// - Make sure that if ResumeToken != "", it reflects the same operation as the other parameters would.
//
// This function is not pure because GUIDs are checked against the local host's datasets.
func (a ZFSSendArgs) Validate(ctx context.Context) error {
func (a ZFSSendArgsUnvalidated) Validate(ctx context.Context) (v ZFSSendArgsValidated, _ error) {
if dp, err := NewDatasetPath(a.FS); err != nil || dp.Length() == 0 {
return newGenericValidationError(a, fmt.Errorf("`FS` must be a valid non-zero dataset path"))
return v, newGenericValidationError(a, fmt.Errorf("`FS` must be a valid non-zero dataset path"))
}
if a.To == nil {
return newGenericValidationError(a, fmt.Errorf("`To` must not be nil"))
return v, newGenericValidationError(a, fmt.Errorf("`To` must not be nil"))
}
if err := a.To.ValidateExists(ctx, a.FS); err != nil {
return newGenericValidationError(a, errors.Wrap(err, "`To` invalid"))
toVersion, err := a.To.ValidateExistsAndGetVersion(ctx, a.FS)
if err != nil {
return v, newGenericValidationError(a, errors.Wrap(err, "`To` invalid"))
}
var fromVersion *FilesystemVersion
if a.From != nil {
if err := a.From.ValidateExists(ctx, a.FS); err != nil {
return newGenericValidationError(a, errors.Wrap(err, "`From` invalid"))
fromV, err := a.From.ValidateExistsAndGetVersion(ctx, a.FS)
if err != nil {
return v, newGenericValidationError(a, errors.Wrap(err, "`From` invalid"))
}
fromVersion = &fromV
// fallthrough
}
if err := a.Encrypted.Validate(); err != nil {
return newGenericValidationError(a, errors.Wrap(err, "`Raw` invalid"))
return v, newGenericValidationError(a, errors.Wrap(err, "`Raw` invalid"))
}
valCtx := &zfsSendArgsValidationContext{}
fsEncrypted, err := ZFSGetEncryptionEnabled(ctx, a.FS)
if err != nil {
return newValidationError(a, ZFSSendArgsFSEncryptionCheckFail,
return v, newValidationError(a, ZFSSendArgsFSEncryptionCheckFail,
errors.Wrapf(err, "cannot check whether filesystem %q is encrypted", a.FS))
}
valCtx.encEnabled = &NilBool{fsEncrypted}
if a.Encrypted.B && !fsEncrypted {
return newValidationError(a, ZFSSendArgsEncryptedSendRequestedButFSUnencrypted,
return v, newValidationError(a, ZFSSendArgsEncryptedSendRequestedButFSUnencrypted,
errors.Errorf("encrypted send requested, but filesystem %q is not encrypted", a.FS))
}
if a.ResumeToken != "" {
if err := a.validateCorrespondsToResumeToken(ctx, valCtx); err != nil {
return newValidationError(a, ZFSSendArgsResumeTokenMismatch, err)
return v, newValidationError(a, ZFSSendArgsResumeTokenMismatch, err)
}
}
return nil
return ZFSSendArgsValidated{
ZFSSendArgsUnvalidated: a,
FromVersion: fromVersion,
ToVersion: toVersion,
}, nil
}
type ZFSSendArgsResumeTokenMismatchError struct {
@ -735,7 +752,7 @@ func (c ZFSSendArgsResumeTokenMismatchErrorCode) fmt(format string, args ...inte
// This is SECURITY SENSITIVE and requires exhaustive checking of both side's values
// An attacker requesting a Send with a crafted ResumeToken may encode different parameters in the resume token than expected:
// for example, they may specify another file system (e.g. the filesystem with secret data) or request unencrypted send instead of encrypted raw send.
func (a ZFSSendArgs) validateCorrespondsToResumeToken(ctx context.Context, valCtx *zfsSendArgsValidationContext) error {
func (a ZFSSendArgsUnvalidated) validateCorrespondsToResumeToken(ctx context.Context, valCtx *zfsSendArgsValidationContext) error {
if a.ResumeToken == "" {
return nil // nothing to do
@ -808,7 +825,7 @@ var ErrEncryptedSendNotSupported = fmt.Errorf("raw sends which are required for
// (if from is "" a full ZFS send is done)
//
// Returns ErrEncryptedSendNotSupported if encrypted send is requested but not supported by CLI
func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, error) {
func ZFSSend(ctx context.Context, sendArgs ZFSSendArgsValidated) (*ReadCloserCopier, error) {
args := make([]string, 0)
args = append(args, "send")
@ -825,10 +842,6 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro
}
}
if err := sendArgs.Validate(ctx); err != nil {
return nil, err // do not wrap, part of API, tested by platformtest
}
sargs, err := sendArgs.buildCommonSendArgs()
if err != nil {
return nil, err
@ -959,11 +972,7 @@ func (s *DrySendInfo) unmarshalInfoLine(l string) (regexMatched bool, err error)
// to may be "", in which case a full ZFS send is done
// May return BookmarkSizeEstimationNotSupported as err if from is a bookmark.
func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgs) (_ *DrySendInfo, err error) {
if err := sendArgs.Validate(ctx); err != nil {
return nil, errors.Wrap(err, "cannot validate send args")
}
func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgsValidated) (_ *DrySendInfo, err error) {
if sendArgs.From != nil && strings.Contains(sendArgs.From.RelName, "#") {
/* TODO:
@ -1458,41 +1467,6 @@ func zfsGet(path string, props []string, allowedSources zfsPropertySource) (*ZFS
return res, nil
}
type ZFSPropCreateTxgAndGuidProps struct {
CreateTXG, Guid uint64
}
func ZFSGetCreateTXGAndGuid(ds string) (ZFSPropCreateTxgAndGuidProps, error) {
props, err := zfsGetNumberProps(ds, []string{"createtxg", "guid"}, sourceAny)
if err != nil {
return ZFSPropCreateTxgAndGuidProps{}, err
}
return ZFSPropCreateTxgAndGuidProps{
CreateTXG: props["createtxg"],
Guid: props["guid"],
}, nil
}
// returns *DatasetDoesNotExist if the dataset does not exist
func zfsGetNumberProps(ds string, props []string, src zfsPropertySource) (map[string]uint64, error) {
sps, err := zfsGet(ds, props, sourceAny)
if err != nil {
if _, ok := err.(*DatasetDoesNotExist); ok {
return nil, err // pass through as is
}
return nil, errors.Wrap(err, "zfs: set replication cursor: get snapshot createtxg")
}
r := make(map[string]uint64, len(props))
for _, p := range props {
v, err := strconv.ParseUint(sps.Get(p), 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "zfs get: parse number property %q", p)
}
r[p] = v
}
return r, nil
}
type DestroySnapshotsError struct {
RawLines []string
Filesystem string