diff --git a/client/holds.go b/client/holds.go index 8db93d3..adc0a36 100644 --- a/client/holds.go +++ b/client/holds.go @@ -1,13 +1,11 @@ package client import ( - "context" - "encoding/json" "fmt" - "os" + "sort" "strings" - "github.com/pkg/errors" + "github.com/spf13/pflag" "github.com/zrepl/zrepl/cli" "github.com/zrepl/zrepl/daemon/filters" @@ -20,68 +18,126 @@ var ( Use: "holds", Short: "manage holds & step bookmarks", SetupSubcommands: func() []*cli.Subcommand { - return holdsList + return []*cli.Subcommand{ + holdsCmdList, + holdsCmdReleaseAll, + holdsCmdReleaseStale, + } }, } ) -var holdsList = []*cli.Subcommand{ - &cli.Subcommand{ - Use: "list [FSFILTER]", - Run: doHoldsList, - NoRequireConfig: true, - Short: ` -FSFILTER SYNTAX: -representation of a 'filesystems' filter statement on the command line - `, - }, +// a common set of CLI flags that map to the fields of an +// endpoint.ListZFSHoldsAndBookmarksQuery +type holdsFilterFlags struct { + Filesystems FilesystemsFilterFlag + Job JobIDFlag + Types AbstractionTypesFlag } -func fsfilterFromCliArg(arg string) (zfs.DatasetFilter, error) { - mappings := strings.Split(arg, ",") +// produce a query from the CLI flags +func (f holdsFilterFlags) Query() (endpoint.ListZFSHoldsAndBookmarksQuery, error) { + q := endpoint.ListZFSHoldsAndBookmarksQuery{ + FS: f.Filesystems.FlagValue(), + What: f.Types.FlagValue(), + JobID: f.Job.FlagValue(), + Until: nil, // TODO support this as a flag + } + return q, q.Validate() +} + +func (f *holdsFilterFlags) registerHoldsFilterFlags(s *pflag.FlagSet, verb string) { + // Note: the default value is defined in the .FlagValue methods + s.Var(&f.Filesystems, "fs", fmt.Sprintf("only %s holds on the specified filesystem [default: all filesystems] [comma-separated list of : pairs]", verb)) + s.Var(&f.Job, "job", fmt.Sprintf("only %s holds created by the specified job [default: any job]", verb)) + + variants := make([]string, 0, len(endpoint.AbstractionTypesAll)) + for v := range endpoint.AbstractionTypesAll { + variants = append(variants, string(v)) + } + variants = sort.StringSlice(variants) + variantsJoined := strings.Join(variants, "|") + s.Var(&f.Types, "type", fmt.Sprintf("only %s holds of the specified type [default: all] [comma-separated list of %s]", verb, variantsJoined)) +} + +type JobIDFlag struct{ J *endpoint.JobID } + +func (f *JobIDFlag) Set(s string) error { + if len(s) == 0 { + *f = JobIDFlag{J: nil} + return nil + } + + jobID, err := endpoint.MakeJobID(s) + if err != nil { + return err + } + *f = JobIDFlag{J: &jobID} + return nil +} +func (f JobIDFlag) Type() string { return "job-ID" } +func (f JobIDFlag) String() string { return fmt.Sprint(f.J) } +func (f JobIDFlag) FlagValue() *endpoint.JobID { return f.J } + +type AbstractionTypesFlag map[endpoint.AbstractionType]bool + +func (f *AbstractionTypesFlag) Set(s string) error { + ats, err := endpoint.AbstractionTypeSetFromStrings(strings.Split(s, ",")) + if err != nil { + return err + } + *f = AbstractionTypesFlag(ats) + return nil +} +func (f AbstractionTypesFlag) Type() string { return "abstraction-type" } +func (f AbstractionTypesFlag) String() string { + return endpoint.AbstractionTypeSet(f).String() +} +func (f AbstractionTypesFlag) FlagValue() map[endpoint.AbstractionType]bool { + if len(f) > 0 { + return f + } + return endpoint.AbstractionTypesAll +} + +type FilesystemsFilterFlag struct { + F endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter +} + +func (flag *FilesystemsFilterFlag) Set(s string) error { + mappings := strings.Split(s, ",") + if len(mappings) == 1 { + flag.F = endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &mappings[0], + } + return nil + } + f := filters.NewDatasetMapFilter(len(mappings), true) for _, m := range mappings { thisMappingErr := fmt.Errorf("expecting comma-separated list of : pairs, got %q", m) lhsrhs := strings.SplitN(m, ":", 2) if len(lhsrhs) != 2 { - return nil, thisMappingErr + return thisMappingErr } err := f.Add(lhsrhs[0], lhsrhs[1]) if err != nil { - return nil, fmt.Errorf("%s: %s", thisMappingErr, err) + return fmt.Errorf("%s: %s", thisMappingErr, err) } } - return f.AsFilter(), nil -} - -func doHoldsList(sc *cli.Subcommand, args []string) error { - var err error - ctx := context.Background() - - if len(args) > 1 { - return errors.New("this subcommand takes at most one argument") + flag.F = endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + Filter: f, } - - var filter zfs.DatasetFilter - if len(args) == 0 { - filter = zfs.NoFilter() - } else { - filter, err = fsfilterFromCliArg(args[0]) - if err != nil { - return errors.Wrap(err, "cannot parse filesystem filter args") - } - } - - listing, err := endpoint.ListZFSHoldsAndBookmarks(ctx, filter) - if err != nil { - return err // context clear by invocation of command - } - - enc := json.NewEncoder(os.Stdout) - enc.SetIndent(" ", " ") - if err := enc.Encode(listing); err != nil { - panic(err) - } - return nil } +func (flag FilesystemsFilterFlag) Type() string { return "filesystem filter spec" } +func (flag FilesystemsFilterFlag) String() string { + return fmt.Sprintf("%v", flag.F) +} +func (flag FilesystemsFilterFlag) FlagValue() endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter { + var z FilesystemsFilterFlag + if flag == z { + return endpoint.ListZFSHoldsAndBookmarksQueryFilesystemFilter{Filter: zfs.NoFilter()} + } + return flag.F +} diff --git a/client/holds_list.go b/client/holds_list.go new file mode 100644 index 0000000..3f9b1a7 --- /dev/null +++ b/client/holds_list.go @@ -0,0 +1,71 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "os" + + "github.com/fatih/color" + "github.com/pkg/errors" + "github.com/spf13/pflag" + "github.com/zrepl/zrepl/cli" + "github.com/zrepl/zrepl/endpoint" +) + +var holdsListFlags struct { + Filter holdsFilterFlags + Json bool +} + +var holdsCmdList = &cli.Subcommand{ + Use: "list", + Run: doHoldsList, + NoRequireConfig: true, + Short: "list holds and bookmarks", + SetupFlags: func(f *pflag.FlagSet) { + holdsListFlags.Filter.registerHoldsFilterFlags(f, "list") + f.BoolVar(&holdsListFlags.Json, "json", false, "emit JSON") + }, +} + +func doHoldsList(sc *cli.Subcommand, args []string) error { + var err error + ctx := context.Background() + + if len(args) > 0 { + return errors.New("this subcommand takes no positional arguments") + } + + q, err := holdsListFlags.Filter.Query() + if err != nil { + return errors.Wrap(err, "invalid filter specification on command line") + } + + abstractions, errors, err := endpoint.ListAbstractions(ctx, q) + if err != nil { + return err // context clear by invocation of command + } + + // always print what we got + if holdsListFlags.Json { + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + if err := enc.Encode(abstractions); err != nil { + panic(err) + } + fmt.Println() + } else { + for _, a := range abstractions { + fmt.Println(a) + } + } + + // then potential errors, so that users always see them + if len(errors) > 0 { + color.New(color.FgRed).Fprintf(os.Stderr, "there were errors in listing the abstractions:\n%s\n", errors) + return fmt.Errorf("") + } else { + return nil + } +} diff --git a/client/holds_release.go b/client/holds_release.go new file mode 100644 index 0000000..b97c1f9 --- /dev/null +++ b/client/holds_release.go @@ -0,0 +1,144 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "os" + + "github.com/fatih/color" + "github.com/pkg/errors" + "github.com/spf13/pflag" + "github.com/zrepl/zrepl/cli" + "github.com/zrepl/zrepl/endpoint" +) + +// shared between release-all and release-step +var holdsReleaseFlags struct { + Filter holdsFilterFlags + Json bool + DryRun bool +} + +func registerHoldsReleaseFlags(s *pflag.FlagSet) { + holdsReleaseFlags.Filter.registerHoldsFilterFlags(s, "release") + s.BoolVar(&holdsReleaseFlags.Json, "json", false, "emit json instead of pretty-printed") + s.BoolVar(&holdsReleaseFlags.DryRun, "dry-run", false, "do a dry-run") +} + +var holdsCmdReleaseAll = &cli.Subcommand{ + Use: "release-all", + Run: doHoldsReleaseAll, + NoRequireConfig: true, + Short: `(DANGEROUS) release all zrepl-managed holds and bookmarks, mostly useful for uninstalling zrepl`, + SetupFlags: registerHoldsReleaseFlags, +} + +var holdsCmdReleaseStale = &cli.Subcommand{ + Use: "release-stale", + Run: doHoldsReleaseStale, + NoRequireConfig: true, + Short: `release stale zrepl-managed holds and boomkarks (useful if zrepl has a bug and doesn't do it by itself)`, + SetupFlags: registerHoldsReleaseFlags, +} + +func doHoldsReleaseAll(sc *cli.Subcommand, args []string) error { + var err error + ctx := context.Background() + + if len(args) > 0 { + return errors.New("this subcommand takes no positional arguments") + } + + q, err := holdsReleaseFlags.Filter.Query() + if err != nil { + return errors.Wrap(err, "invalid filter specification on command line") + } + + abstractions, listErrors, err := endpoint.ListAbstractions(ctx, q) + if err != nil { + return err // context clear by invocation of command + } + if len(listErrors) > 0 { + color.New(color.FgRed).Fprintf(os.Stderr, "there were errors in listing the abstractions:\n%s\n", listErrors) + // proceed anyways with rest of abstractions + } + + return doHoldsRelease_Common(ctx, abstractions) +} + +func doHoldsReleaseStale(sc *cli.Subcommand, args []string) error { + + var err error + ctx := context.Background() + + if len(args) > 0 { + return errors.New("this subcommand takes no positional arguments") + } + + q, err := holdsReleaseFlags.Filter.Query() + if err != nil { + return errors.Wrap(err, "invalid filter specification on command line") + } + + stalenessInfo, err := endpoint.ListStale(ctx, q) + if err != nil { + return err // context clear by invocation of command + } + + return doHoldsRelease_Common(ctx, stalenessInfo.Stale) +} + +func doHoldsRelease_Common(ctx context.Context, destroy []endpoint.Abstraction) error { + + if holdsReleaseFlags.DryRun { + if holdsReleaseFlags.Json { + m, err := json.MarshalIndent(destroy, "", " ") + if err != nil { + panic(err) + } + if _, err := os.Stdout.Write(m); err != nil { + panic(err) + } + fmt.Println() + } else { + for _, a := range destroy { + fmt.Printf("would destroy %s\n", a) + } + } + return nil + } + + outcome := endpoint.BatchDestroy(ctx, destroy) + hadErr := false + + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + colorErr := color.New(color.FgRed) + printfSuccess := color.New(color.FgGreen).FprintfFunc() + printfSection := color.New(color.Bold).FprintfFunc() + + for res := range outcome { + hadErr = hadErr || res.DestroyErr != nil + if holdsReleaseFlags.Json { + err := enc.Encode(res) + if err != nil { + colorErr.Fprintf(os.Stderr, "cannot marshal there were errors in destroying the abstractions") + } + } else { + printfSection(os.Stdout, "destroy %s ...", res.Abstraction) + if res.DestroyErr != nil { + colorErr.Fprintf(os.Stdout, " failed:\n%s\n", res.DestroyErr) + } else { + printfSuccess(os.Stdout, " OK\n") + } + } + } + + if hadErr { + colorErr.Add(color.Bold).Fprintf(os.Stderr, "there were errors in destroying the abstractions") + return fmt.Errorf("") + } else { + return nil + } +} diff --git a/docs/configuration/overview.rst b/docs/configuration/overview.rst index 9d1e5af..2753548 100644 --- a/docs/configuration/overview.rst +++ b/docs/configuration/overview.rst @@ -120,7 +120,7 @@ The following steps take place during replication and can be monitored using the * Perform replication steps in the following order: Among all filesystems with pending replication steps, pick the filesystem whose next replication step's snapshot is the oldest. * Create placeholder filesystems on the receiving side to mirror the dataset paths on the sender to ``root_fs/${client_identity}``. - * Acquire send-side step-holds on the step's `from` and `to` snapshots. + * Acquire send-side *step-holds* on the step's `from` and `to` snapshots. * Perform the replication step. * Move the **replication cursor** bookmark on the sending side (see below). * Move the **last-received-hold** on the receiving side (see below). diff --git a/docs/usage.rst b/docs/usage.rst index e6df674..742ccab 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -38,6 +38,8 @@ CLI Overview * - ``zrepl migrate`` - | perform on-disk state / ZFS property migrations | (see :ref:`changelog ` for details) + * - ``zrepl holds`` + - list and remove holds and step bookmarks created by zrepl (see :ref:`overview ` ) .. _usage-zrepl-daemon: diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 1f6862f..2ef5552 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -113,25 +113,40 @@ func (p *Sender) HintMostRecentCommonAncestor(ctx context.Context, r *pdu.HintMo var err error fs := r.GetFilesystem() - mostRecent, err := sendArgsFromPDUAndValidateExists(ctx, fs, r.GetSenderVersion()) + log := getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", r.GetSenderVersion())) + // FIXME check if fs exists AND IS PERMITTED by p.filesystems + + log.WithField("full_hint", r).Debug("full hint") + + if r.GetSenderVersion() == nil { + // no common ancestor found, likely due to failed prior replication attempt + // => release stale step holds to prevent them from accumulating + // (they can accumulate on initial replication because each inital replication step might hold a different `to`) + // => replication cursors cannot accumulate because we always _move_ the replication cursor + log.Debug("releasing all step holds on the filesystem") + TryReleaseStepStaleFS(ctx, fs, p.jobId) + return &pdu.HintMostRecentCommonAncestorRes{}, nil + } + // we were hinted a specific common ancestor + + mostRecentVersion, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, r.GetSenderVersion()) if err != nil { msg := "HintMostRecentCommonAncestor rpc with nonexistent most recent version" - getLogger(ctx).WithField("fs", fs).WithField("hinted_most_recent", fmt.Sprintf("%#v", mostRecent)). - Warn(msg) + log.Warn(msg) return nil, errors.Wrap(err, msg) } // move replication cursor to this position - _, err = MoveReplicationCursor(ctx, fs, mostRecent, p.jobId) + _, err = MoveReplicationCursor(ctx, fs, mostRecentVersion, p.jobId) if err == zfs.ErrBookmarkCloningNotSupported { - getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it") + log.Debug("not creating replication cursor from bookmark because ZFS does not support it") // fallthrough } else if err != nil { return nil, errors.Wrap(err, "cannot set replication cursor to hinted version") } // cleanup previous steps - if err := ReleaseStepAll(ctx, fs, mostRecent, p.jobId); err != nil { + if err := ReleaseStepCummulativeInclusive(ctx, fs, mostRecentVersion, p.jobId); err != nil { return nil, errors.Wrap(err, "cannot cleanup prior invocation's step holds and bookmarks") } @@ -147,15 +162,16 @@ func uncheckedSendArgsFromPDU(fsv *pdu.FilesystemVersion) *zfs.ZFSSendArgVersion return &zfs.ZFSSendArgVersion{RelName: fsv.GetRelName(), GUID: fsv.Guid} } -func sendArgsFromPDUAndValidateExists(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (*zfs.ZFSSendArgVersion, error) { - v := uncheckedSendArgsFromPDU(fsv) - if v == nil { - return nil, errors.New("must not be nil") +func sendArgsFromPDUAndValidateExistsAndGetVersion(ctx context.Context, fs string, fsv *pdu.FilesystemVersion) (v zfs.FilesystemVersion, err error) { + sendArgs := uncheckedSendArgsFromPDU(fsv) + if sendArgs == nil { + return v, errors.New("must not be nil") } - if err := v.ValidateExists(ctx, fs); err != nil { - return nil, err + version, err := sendArgs.ValidateExistsAndGetVersion(ctx, fs) + if err != nil { + return v, err } - return v, nil + return version, nil } func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.StreamCopier, error) { @@ -182,7 +198,7 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St return nil, nil, fmt.Errorf("unknown pdu.Tri variant %q", r.Encrypted) } - sendArgs := zfs.ZFSSendArgs{ + sendArgsUnvalidated := zfs.ZFSSendArgsUnvalidated{ FS: r.Filesystem, From: uncheckedSendArgsFromPDU(r.GetFrom()), // validated by zfs.ZFSSendDry / zfs.ZFSSend To: uncheckedSendArgsFromPDU(r.GetTo()), // validated by zfs.ZFSSendDry / zfs.ZFSSend @@ -190,6 +206,11 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St ResumeToken: r.ResumeToken, // nil or not nil, depending on decoding success } + sendArgs, err := sendArgsUnvalidated.Validate(ctx) + if err != nil { + return nil, nil, errors.Wrap(err, "validate send arguments") + } + getLogger(ctx).Debug("acquire concurrent send semaphore") // TODO use try-acquire and fail with resource-exhaustion rpc status // => would require handling on the client-side @@ -224,7 +245,7 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St // update replication cursor if sendArgs.From != nil { // For all but the first replication, this should always be a no-op because SendCompleted already moved the cursor - _, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.From, s.jobId) + _, err = MoveReplicationCursor(ctx, sendArgs.FS, sendArgs.FromVersion, s.jobId) if err == zfs.ErrBookmarkCloningNotSupported { getLogger(ctx).Debug("not creating replication cursor from bookmark because ZFS does not support it") // fallthrough @@ -235,18 +256,18 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St // make sure `From` doesn't go away in order to make this step resumable if sendArgs.From != nil { - err := HoldStep(ctx, sendArgs.FS, sendArgs.From, s.jobId) + err := HoldStep(ctx, sendArgs.FS, *sendArgs.FromVersion, s.jobId) if err == zfs.ErrBookmarkCloningNotSupported { getLogger(ctx).Debug("not creating step bookmark because ZFS does not support it") // fallthrough } else if err != nil { - return nil, nil, errors.Wrap(err, "cannot create step bookmark") + return nil, nil, errors.Wrapf(err, "cannot hold `from` version %q before starting send", *sendArgs.FromVersion) } } // make sure `To` doesn't go away in order to make this step resumable - err = HoldStep(ctx, sendArgs.FS, sendArgs.To, s.jobId) + err = HoldStep(ctx, sendArgs.FS, sendArgs.ToVersion, s.jobId) if err != nil { - return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.To.RelName) + return nil, nil, errors.Wrapf(err, "cannot hold `to` version %q before starting send", sendArgs.ToVersion) } // step holds & replication cursor released / moved forward in s.SendCompleted => s.moveCursorAndReleaseSendHolds @@ -263,23 +284,24 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p fs := orig.GetFilesystem() var err error - var from *zfs.ZFSSendArgVersion + var from *zfs.FilesystemVersion if orig.GetFrom() != nil { - from, err = sendArgsFromPDUAndValidateExists(ctx, fs, orig.GetFrom()) // no shadow + f, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetFrom()) // no shadow if err != nil { return nil, errors.Wrap(err, "validate `from` exists") } + from = &f } - to, err := sendArgsFromPDUAndValidateExists(ctx, fs, orig.GetTo()) + to, err := sendArgsFromPDUAndValidateExistsAndGetVersion(ctx, fs, orig.GetTo()) if err != nil { return nil, errors.Wrap(err, "validate `to` exists") } - log := getLogger(ctx).WithField("to_guid", to.GUID). + log := getLogger(ctx).WithField("to_guid", to.Guid). WithField("fs", fs). WithField("to", to.RelName) if from != nil { - log = log.WithField("from", from.RelName).WithField("from_guid", from.GUID) + log = log.WithField("from", from.RelName).WithField("from_guid", from.Guid) } log.Debug("move replication cursor to most recent common version") @@ -320,18 +342,14 @@ func (p *Sender) SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*p return } log.Debug("release step-hold of or step-bookmark on `from`") - err := ReleaseStep(ctx, fs, from, p.jobId) + err := ReleaseStep(ctx, fs, *from, p.jobId) if err != nil { if dne, ok := err.(*zfs.DatasetDoesNotExist); ok { // If bookmark cloning is not supported, `from` might be the old replication cursor // and thus have already been destroyed by MoveReplicationCursor above // In that case, nonexistence of `from` is not an error, otherwise it is. - fsp, err := zfs.NewDatasetPath(fs) - if err != nil { - panic(err) // fs has been validated multiple times above - } - for _, fsv := range destroyedCursors { - if fsv.ToAbsPath(fsp) == dne.Path { + for _, c := range destroyedCursors { + if c.GetFullPath() == dne.Path { log.Info("`from` was a replication cursor and has already been destroyed") return } @@ -610,9 +628,6 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs if to == nil { return nil, errors.New("`To` must not be nil") } - if err := to.ValidateInMemory(lp.ToString()); err != nil { - return nil, errors.Wrap(err, "`To` invalid") - } if !to.IsSnapshot() { return nil, errors.New("`To` must be a snapshot") } @@ -725,7 +740,8 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs } // validate that we actually received what the sender claimed - if err := to.ValidateExists(ctx, lp.ToString()); err != nil { + toRecvd, err := to.ValidateExistsAndGetVersion(ctx, lp.ToString()) + if err != nil { msg := "receive request's `To` version does not match what we received in the stream" getLogger(ctx).WithError(err).WithField("snap", snapFullPath).Error(msg) getLogger(ctx).Error("aborting recv request, but keeping received snapshot for inspection") @@ -734,7 +750,7 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs if s.conf.UpdateLastReceivedHold { getLogger(ctx).Debug("move last-received-hold") - if err := MoveLastReceivedHold(ctx, lp.ToString(), *to, s.conf.JobID); err != nil { + if err := MoveLastReceivedHold(ctx, lp.ToString(), toRecvd, s.conf.JobID); err != nil { return nil, errors.Wrap(err, "cannot move last-received-hold") } } diff --git a/endpoint/endpoint_zfs.go b/endpoint/endpoint_zfs.go deleted file mode 100644 index 3b83e93..0000000 --- a/endpoint/endpoint_zfs.go +++ /dev/null @@ -1,503 +0,0 @@ -package endpoint - -import ( - "context" - "fmt" - "regexp" - "sort" - - "github.com/kr/pretty" - "github.com/pkg/errors" - - "github.com/zrepl/zrepl/zfs" -) - -var stepHoldTagRE = regexp.MustCompile("^zrepl_STEP_J_(.+)") - -func StepHoldTag(jobid JobID) (string, error) { - return stepHoldTagImpl(jobid.String()) -} - -func stepHoldTagImpl(jobid string) (string, error) { - t := fmt.Sprintf("zrepl_STEP_J_%s", jobid) - if err := zfs.ValidHoldTag(t); err != nil { - return "", err - } - return t, nil -} - -// err != nil always means that the bookmark is not a step bookmark -func ParseStepHoldTag(tag string) (JobID, error) { - match := stepHoldTagRE.FindStringSubmatch(tag) - if match == nil { - return JobID{}, fmt.Errorf("parse hold tag: match regex %q", stepHoldTagRE) - } - jobID, err := MakeJobID(match[1]) - if err != nil { - return JobID{}, errors.Wrap(err, "parse hold tag: invalid job id field") - } - return jobID, nil -} - -const stepBookmarkNamePrefix = "zrepl_STEP" - -// v must be validated by caller -func StepBookmarkName(fs string, guid uint64, id JobID) (string, error) { - return stepBookmarkNameImpl(fs, guid, id.String()) -} - -func stepBookmarkNameImpl(fs string, guid uint64, jobid string) (string, error) { - return makeJobAndGuidBookmarkName(stepBookmarkNamePrefix, fs, guid, jobid) -} - -// name is the full bookmark name, including dataset path -// -// err != nil always means that the bookmark is not a step bookmark -func ParseStepBookmarkName(fullname string) (guid uint64, jobID JobID, err error) { - guid, jobID, err = parseJobAndGuidBookmarkName(fullname, stepBookmarkNamePrefix) - if err != nil { - err = errors.Wrap(err, "parse step bookmark name") // no shadow! - } - return guid, jobID, err -} - -const replicationCursorBookmarkNamePrefix = "zrepl_CURSOR" - -func ReplicationCursorBookmarkName(fs string, guid uint64, id JobID) (string, error) { - return replicationCursorBookmarkNameImpl(fs, guid, id.String()) -} - -func replicationCursorBookmarkNameImpl(fs string, guid uint64, jobid string) (string, error) { - return makeJobAndGuidBookmarkName(replicationCursorBookmarkNamePrefix, fs, guid, jobid) -} - -var ErrV1ReplicationCursor = fmt.Errorf("bookmark name is a v1-replication cursor") - -//err != nil always means that the bookmark is not a valid replication bookmark -// -// Returns ErrV1ReplicationCursor as error if the bookmark is a v1 replication cursor -func ParseReplicationCursorBookmarkName(fullname string) (uint64, JobID, error) { - - // check for legacy cursors - { - if err := zfs.EntityNamecheck(fullname, zfs.EntityTypeBookmark); err != nil { - return 0, JobID{}, errors.Wrap(err, "parse replication cursor bookmark name") - } - _, _, name, err := zfs.DecomposeVersionString(fullname) - if err != nil { - return 0, JobID{}, errors.Wrap(err, "parse replication cursor bookmark name: decompose version string") - } - const V1ReplicationCursorBookmarkName = "zrepl_replication_cursor" - if name == V1ReplicationCursorBookmarkName { - return 0, JobID{}, ErrV1ReplicationCursor - } - } - - guid, jobID, err := parseJobAndGuidBookmarkName(fullname, replicationCursorBookmarkNamePrefix) - if err != nil { - err = errors.Wrap(err, "parse replication cursor bookmark name") // no shadow - } - return guid, jobID, err -} - -// may return nil for both values, indicating there is no cursor -func GetMostRecentReplicationCursorOfJob(ctx context.Context, fs string, jobID JobID) (*zfs.FilesystemVersion, error) { - fsp, err := zfs.NewDatasetPath(fs) - if err != nil { - return nil, err - } - candidates, err := GetReplicationCursors(ctx, fsp, jobID) - if err != nil || len(candidates) == 0 { - return nil, err - } - - sort.Slice(candidates, func(i, j int) bool { - return candidates[i].CreateTXG < candidates[j].CreateTXG - }) - - mostRecent := candidates[len(candidates)-1] - return &mostRecent, nil -} - -func GetReplicationCursors(ctx context.Context, fs *zfs.DatasetPath, jobID JobID) ([]zfs.FilesystemVersion, error) { - - listOut := &ListHoldsAndBookmarksOutput{} - if err := listZFSHoldsAndBookmarksImplFS(ctx, listOut, fs); err != nil { - return nil, errors.Wrap(err, "get replication cursor: list bookmarks and holds") - } - - if len(listOut.V1ReplicationCursors) > 0 { - getLogger(ctx).WithField("bookmark", pretty.Sprint(listOut.V1ReplicationCursors)). - Warn("found v1-replication cursor bookmarks, consider running migration 'replication-cursor:v1-v2' after successful replication with this zrepl version") - } - - candidates := make([]zfs.FilesystemVersion, 0) - for _, v := range listOut.ReplicationCursorBookmarks { - zv := zfs.ZFSSendArgVersion{ - RelName: "#" + v.Name, - GUID: v.Guid, - } - if err := zv.ValidateExists(ctx, v.FS); err != nil { - getLogger(ctx).WithError(err).WithField("bookmark", zv.FullPath(v.FS)). - Error("found invalid replication cursor bookmark") - continue - } - candidates = append(candidates, v.v) - } - - return candidates, nil -} - -// `target` is validated before replication cursor is set. if validation fails, the cursor is not moved. -// -// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS -func MoveReplicationCursor(ctx context.Context, fs string, target *zfs.ZFSSendArgVersion, jobID JobID) (destroyedCursors []zfs.FilesystemVersion, err error) { - - if !target.IsSnapshot() { - return nil, zfs.ErrBookmarkCloningNotSupported - } - - snapProps, err := target.ValidateExistsAndGetCheckedProps(ctx, fs) - if err != nil { - return nil, errors.Wrapf(err, "invalid replication cursor target %q (guid=%v)", target.RelName, target.GUID) - } - - bookmarkname, err := ReplicationCursorBookmarkName(fs, snapProps.Guid, jobID) - if err != nil { - return nil, errors.Wrap(err, "determine replication cursor name") - } - - // idempotently create bookmark (guid is encoded in it, hence we'll most likely add a new one - // cleanup the old one afterwards - - err = zfs.ZFSBookmark(fs, *target, bookmarkname) - if err != nil { - if err == zfs.ErrBookmarkCloningNotSupported { - return nil, err // TODO go1.13 use wrapping - } - return nil, errors.Wrapf(err, "cannot create bookmark") - } - - destroyedCursors, err = DestroyObsoleteReplicationCursors(ctx, fs, target, jobID) - if err != nil { - return nil, errors.Wrap(err, "destroy obsolete replication cursors") - } - - return destroyedCursors, nil -} - -func DestroyObsoleteReplicationCursors(ctx context.Context, fs string, target *zfs.ZFSSendArgVersion, jobID JobID) (destroyed []zfs.FilesystemVersion, err error) { - return destroyBookmarksOlderThan(ctx, fs, target, jobID, func(shortname string) (accept bool) { - _, parsedID, err := ParseReplicationCursorBookmarkName(fs + "#" + shortname) - return err == nil && parsedID == jobID - }) -} - -// idempotently hold / step-bookmark `version` -// -// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS -func HoldStep(ctx context.Context, fs string, v *zfs.ZFSSendArgVersion, jobID JobID) error { - if err := v.ValidateExists(ctx, fs); err != nil { - return err - } - if v.IsSnapshot() { - - tag, err := StepHoldTag(jobID) - if err != nil { - return errors.Wrap(err, "step hold tag") - } - - if err := zfs.ZFSHold(ctx, fs, *v, tag); err != nil { - return errors.Wrap(err, "step hold: zfs") - } - - return nil - } - - v.MustBeBookmark() - - bmname, err := StepBookmarkName(fs, v.GUID, jobID) - if err != nil { - return errors.Wrap(err, "create step bookmark: determine bookmark name") - } - // idempotently create bookmark - err = zfs.ZFSBookmark(fs, *v, bmname) - if err != nil { - if err == zfs.ErrBookmarkCloningNotSupported { - // TODO we could actually try to find a local snapshot that has the requested GUID - // however, the replication algorithm prefers snapshots anyways, so this quest - // is most likely not going to be successful. Also, there's the possibility that - // the caller might want to filter what snapshots are eligible, and this would - // complicate things even further. - return err // TODO go1.13 use wrapping - } - return errors.Wrap(err, "create step bookmark: zfs") - } - return nil -} - -// idempotently release the step-hold on v if v is a snapshot -// or idempotently destroy the step-bookmark of v if v is a bookmark -// -// note that this operation leaves v itself untouched, unless v is the step-bookmark itself, in which case v is destroyed -// -// returns an instance of *zfs.DatasetDoesNotExist if `v` does not exist -func ReleaseStep(ctx context.Context, fs string, v *zfs.ZFSSendArgVersion, jobID JobID) error { - - if err := v.ValidateExists(ctx, fs); err != nil { - return err - } - - if v.IsSnapshot() { - tag, err := StepHoldTag(jobID) - if err != nil { - return errors.Wrap(err, "step release tag") - } - - if err := zfs.ZFSRelease(ctx, tag, v.FullPath(fs)); err != nil { - return errors.Wrap(err, "step release: zfs") - } - - return nil - } - - v.MustBeBookmark() - - bmname, err := StepBookmarkName(fs, v.GUID, jobID) - if err != nil { - return errors.Wrap(err, "step release: determine bookmark name") - } - // idempotently destroy bookmark - - if err := zfs.ZFSDestroyIdempotent(bmname); err != nil { - return errors.Wrap(err, "step release: bookmark destroy: zfs") - } - - return nil -} - -// release {step holds, step bookmarks} earlier and including `mostRecent` -func ReleaseStepAll(ctx context.Context, fs string, mostRecent *zfs.ZFSSendArgVersion, jobID JobID) error { - - if err := mostRecent.ValidateInMemory(fs); err != nil { - return err - } - - tag, err := StepHoldTag(jobID) - if err != nil { - return errors.Wrap(err, "step release all: tag") - } - - err = zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, mostRecent.GUID, tag) - if err != nil { - return errors.Wrapf(err, "step release all: release holds older and including %q", mostRecent.FullPath(fs)) - } - - _, err = destroyBookmarksOlderThan(ctx, fs, mostRecent, jobID, func(shortname string) bool { - _, parsedId, parseErr := ParseStepBookmarkName(fs + "#" + shortname) - return parseErr == nil && parsedId == jobID - }) - if err != nil { - return errors.Wrapf(err, "step release all: destroy bookmarks older than %q", mostRecent.FullPath(fs)) - } - - return nil -} - -var lastReceivedHoldTagRE = regexp.MustCompile("^zrepl_last_received_J_(.+)$") - -// err != nil always means that the bookmark is not a step bookmark -func ParseLastReceivedHoldTag(tag string) (JobID, error) { - match := lastReceivedHoldTagRE.FindStringSubmatch(tag) - if match == nil { - return JobID{}, errors.Errorf("parse last-received-hold tag: does not match regex %s", lastReceivedHoldTagRE.String()) - } - jobId, err := MakeJobID(match[1]) - if err != nil { - return JobID{}, errors.Wrap(err, "parse last-received-hold tag: invalid job id field") - } - return jobId, nil -} - -func LastReceivedHoldTag(jobID JobID) (string, error) { - return lastReceivedHoldImpl(jobID.String()) -} - -func lastReceivedHoldImpl(jobid string) (string, error) { - tag := fmt.Sprintf("zrepl_last_received_J_%s", jobid) - if err := zfs.ValidHoldTag(tag); err != nil { - return "", err - } - return tag, nil -} - -func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.ZFSSendArgVersion, jobID JobID) error { - if err := to.ValidateExists(ctx, fs); err != nil { - return err - } - if err := zfs.EntityNamecheck(to.FullPath(fs), zfs.EntityTypeSnapshot); err != nil { - return err - } - - tag, err := LastReceivedHoldTag(jobID) - if err != nil { - return errors.Wrap(err, "last-received-hold: hold tag") - } - - // we never want to be without a hold - // => hold new one before releasing old hold - - err = zfs.ZFSHold(ctx, fs, to, tag) - if err != nil { - return errors.Wrap(err, "last-received-hold: hold newly received") - } - - err = zfs.ZFSReleaseAllOlderThanGUID(ctx, fs, to.GUID, tag) - if err != nil { - return errors.Wrap(err, "last-received-hold: release older holds") - } - - return nil -} - -type ListHoldsAndBookmarksOutputBookmarkV1ReplicationCursor struct { - FS string - Name string -} - -type ListHoldsAndBookmarksOutput struct { - StepBookmarks []*ListHoldsAndBookmarksOutputBookmark - StepHolds []*ListHoldsAndBookmarksOutputHold - - ReplicationCursorBookmarks []*ListHoldsAndBookmarksOutputBookmark - V1ReplicationCursors []*ListHoldsAndBookmarksOutputBookmarkV1ReplicationCursor - LastReceivedHolds []*ListHoldsAndBookmarksOutputHold -} - -type ListHoldsAndBookmarksOutputBookmark struct { - FS, Name string - Guid uint64 - JobID JobID - v zfs.FilesystemVersion -} - -type ListHoldsAndBookmarksOutputHold struct { - FS string - Snap string - SnapGuid uint64 - SnapCreateTXG uint64 - Tag string - JobID JobID -} - -// List all holds and bookmarks managed by endpoint -func ListZFSHoldsAndBookmarks(ctx context.Context, fsfilter zfs.DatasetFilter) (*ListHoldsAndBookmarksOutput, error) { - - // initialize all fields so that JSON serialization of output looks pretty (see client/holds.go) - // however, listZFSHoldsAndBookmarksImplFS shouldn't rely on it - out := &ListHoldsAndBookmarksOutput{ - StepBookmarks: make([]*ListHoldsAndBookmarksOutputBookmark, 0), - StepHolds: make([]*ListHoldsAndBookmarksOutputHold, 0), - ReplicationCursorBookmarks: make([]*ListHoldsAndBookmarksOutputBookmark, 0), - V1ReplicationCursors: make([]*ListHoldsAndBookmarksOutputBookmarkV1ReplicationCursor, 0), - LastReceivedHolds: make([]*ListHoldsAndBookmarksOutputHold, 0), - } - - fss, err := zfs.ZFSListMapping(ctx, fsfilter) - if err != nil { - return nil, errors.Wrap(err, "list filesystems") - } - - for _, fs := range fss { - err := listZFSHoldsAndBookmarksImplFS(ctx, out, fs) - if err != nil { - return nil, errors.Wrapf(err, "list holds and bookmarks on %q", fs.ToString()) - } - } - return out, nil -} - -func listZFSHoldsAndBookmarksImplFS(ctx context.Context, out *ListHoldsAndBookmarksOutput, fs *zfs.DatasetPath) error { - fsvs, err := zfs.ZFSListFilesystemVersions(fs, nil) - if err != nil { - return errors.Wrapf(err, "list filesystem versions of %q", fs) - } - for _, v := range fsvs { - switch v.Type { - case zfs.Bookmark: - listZFSHoldsAndBookmarksImplTryParseBookmark(ctx, out, fs, v) - case zfs.Snapshot: - holds, err := zfs.ZFSHolds(ctx, fs.ToString(), v.Name) - if err != nil { - return errors.Wrapf(err, "get holds of %q", v.ToAbsPath(fs)) - } - for _, tag := range holds { - listZFSHoldsAndBookmarksImplSnapshotTryParseHold(ctx, out, fs, v, tag) - } - default: - continue - } - } - return nil -} - -// pure function, err != nil always indicates parsing error -func listZFSHoldsAndBookmarksImplTryParseBookmark(ctx context.Context, out *ListHoldsAndBookmarksOutput, fs *zfs.DatasetPath, v zfs.FilesystemVersion) { - var err error - - if v.Type != zfs.Bookmark { - panic("impl error") - } - - fullname := v.ToAbsPath(fs) - - bm := &ListHoldsAndBookmarksOutputBookmark{ - FS: fs.ToString(), Name: v.Name, v: v, - } - bm.Guid, bm.JobID, err = ParseStepBookmarkName(fullname) - if err == nil { - out.StepBookmarks = append(out.StepBookmarks, bm) - return - } - - bm.Guid, bm.JobID, err = ParseReplicationCursorBookmarkName(fullname) - if err == nil { - out.ReplicationCursorBookmarks = append(out.ReplicationCursorBookmarks, bm) - return - } else if err == ErrV1ReplicationCursor { - v1rc := &ListHoldsAndBookmarksOutputBookmarkV1ReplicationCursor{ - FS: fs.ToString(), Name: v.Name, - } - out.V1ReplicationCursors = append(out.V1ReplicationCursors, v1rc) - return - } -} - -// pure function, err != nil always indicates parsing error -func listZFSHoldsAndBookmarksImplSnapshotTryParseHold(ctx context.Context, out *ListHoldsAndBookmarksOutput, fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) { - var err error - - if v.Type != zfs.Snapshot { - panic("impl error") - } - - hold := &ListHoldsAndBookmarksOutputHold{ - FS: fs.ToString(), - Snap: v.Name, - SnapGuid: v.Guid, - SnapCreateTXG: v.CreateTXG, - Tag: holdTag, - } - hold.JobID, err = ParseStepHoldTag(holdTag) - if err == nil { - out.StepHolds = append(out.StepHolds, hold) - return - } - - hold.JobID, err = ParseLastReceivedHoldTag(holdTag) - if err == nil { - out.LastReceivedHolds = append(out.LastReceivedHolds, hold) - return - } - -} diff --git a/endpoint/endpoint_zfs_abstraction.go b/endpoint/endpoint_zfs_abstraction.go new file mode 100644 index 0000000..c4d88d5 --- /dev/null +++ b/endpoint/endpoint_zfs_abstraction.go @@ -0,0 +1,530 @@ +package endpoint + +import ( + "context" + "encoding/json" + "fmt" + "sort" + "strings" + + "github.com/pkg/errors" + "github.com/zrepl/zrepl/util/envconst" + "github.com/zrepl/zrepl/zfs" +) + +type AbstractionType string + +// Implementation note: +// There are a lot of exhaustive switches on AbstractionType in the code base. +// When adding a new abstraction type, make sure to search and update them! +const ( + AbstractionStepBookmark AbstractionType = "step-bookmark" + AbstractionStepHold AbstractionType = "step-hold" + AbstractionLastReceivedHold AbstractionType = "last-received-hold" + AbstractionReplicationCursorBookmarkV1 AbstractionType = "replication-cursor-bookmark-v1" + AbstractionReplicationCursorBookmarkV2 AbstractionType = "replication-cursor-bookmark-v2" +) + +var AbstractionTypesAll = map[AbstractionType]bool{ + AbstractionStepBookmark: true, + AbstractionStepHold: true, + AbstractionLastReceivedHold: true, + AbstractionReplicationCursorBookmarkV1: true, + AbstractionReplicationCursorBookmarkV2: true, +} + +// Implementation Note: +// Whenever you add a new accessor, adjust AbstractionJSON.MarshalJSON accordingly +type Abstraction interface { + GetType() AbstractionType + GetFS() string + GetName() string + GetFullPath() string + GetJobID() *JobID // may return nil if the abstraction does not have a JobID + GetCreateTXG() uint64 + GetFilesystemVersion() zfs.FilesystemVersion + String() string + // destroy the abstraction: either releases the hold or destroys the bookmark + Destroy(context.Context) error + json.Marshaler +} + +func (t AbstractionType) Validate() error { + switch t { + case AbstractionStepBookmark: + return nil + case AbstractionStepHold: + return nil + case AbstractionLastReceivedHold: + return nil + case AbstractionReplicationCursorBookmarkV1: + return nil + case AbstractionReplicationCursorBookmarkV2: + return nil + default: + return errors.Errorf("unknown abstraction type %q", t) + } +} + +func (t AbstractionType) MustValidate() error { + if err := t.Validate(); err != nil { + panic(err) + } + return nil +} + +// Number of instances of this abstraction type that are live (not stale) +// per (FS,JobID). -1 for infinity. +func (t AbstractionType) NumLivePerFsAndJob() int { + switch t { + case AbstractionStepBookmark: + return 2 + case AbstractionStepHold: + return 2 + case AbstractionLastReceivedHold: + return 1 + case AbstractionReplicationCursorBookmarkV1: + return -1 + case AbstractionReplicationCursorBookmarkV2: + return 1 + default: + panic(t) + } +} + +type AbstractionJSON struct{ Abstraction } + +var _ json.Marshaler = (*AbstractionJSON)(nil) + +func (a AbstractionJSON) MarshalJSON() ([]byte, error) { + type S struct { + Type AbstractionType + FS string + Name string + FullPath string + JobID *JobID // may return nil if the abstraction does not have a JobID + CreateTXG uint64 + FilesystemVersion zfs.FilesystemVersion + String string + } + v := S{ + Type: a.Abstraction.GetType(), + FS: a.Abstraction.GetFS(), + Name: a.Abstraction.GetName(), + FullPath: a.Abstraction.GetFullPath(), + JobID: a.Abstraction.GetJobID(), + CreateTXG: a.Abstraction.GetCreateTXG(), + FilesystemVersion: a.Abstraction.GetFilesystemVersion(), + String: a.Abstraction.String(), + } + return json.Marshal(v) +} + +type AbstractionTypeSet map[AbstractionType]bool + +func AbstractionTypeSetFromStrings(sts []string) (AbstractionTypeSet, error) { + ats := make(map[AbstractionType]bool, len(sts)) + for i, t := range sts { + at := AbstractionType(t) + if err := at.Validate(); err != nil { + return nil, errors.Wrapf(err, "invalid abstraction type #%d %q", i+1, t) + } + ats[at] = true + } + return ats, nil +} + +func (s AbstractionTypeSet) String() string { + sts := make([]string, 0, len(s)) + for i := range s { + sts = append(sts, string(i)) + } + sts = sort.StringSlice(sts) + return strings.Join(sts, ",") +} + +func (s AbstractionTypeSet) Validate() error { + for k := range s { + if err := k.Validate(); err != nil { + return err + } + } + return nil +} + +type BookmarkExtractor func(fs *zfs.DatasetPath, v zfs.FilesystemVersion) Abstraction + +// returns nil if the abstraction type is not bookmark-based +func (t AbstractionType) BookmarkExtractor() BookmarkExtractor { + switch t { + case AbstractionStepBookmark: + return StepBookmarkExtractor + case AbstractionReplicationCursorBookmarkV1: + return ReplicationCursorV1Extractor + case AbstractionReplicationCursorBookmarkV2: + return ReplicationCursorV2Extractor + case AbstractionStepHold: + return nil + case AbstractionLastReceivedHold: + return nil + default: + panic(fmt.Sprintf("unimpl: %q", t)) + } +} + +type HoldExtractor = func(fs *zfs.DatasetPath, v zfs.FilesystemVersion, tag string) Abstraction + +// returns nil if the abstraction type is not hold-based +func (t AbstractionType) HoldExtractor() HoldExtractor { + switch t { + case AbstractionStepBookmark: + return nil + case AbstractionReplicationCursorBookmarkV1: + return nil + case AbstractionReplicationCursorBookmarkV2: + return nil + case AbstractionStepHold: + return StepHoldExtractor + case AbstractionLastReceivedHold: + return LastReceivedHoldExtractor + default: + panic(fmt.Sprintf("unimpl: %q", t)) + } +} + +type ListZFSHoldsAndBookmarksQuery struct { + FS ListZFSHoldsAndBookmarksQueryFilesystemFilter + // What abstraction types should match (any contained in the set) + What AbstractionTypeSet + + // The output for the query must satisfy _all_ (AND) requirements of all fields in this query struct. + + // if not nil: JobID of the hold or bookmark in question must be equal + // else: JobID of the hold or bookmark can be any value + JobID *JobID + // if not nil: The hold's snapshot or the bookmark's createtxg must be less than (or equal) Until + // else: CreateTXG of the hold or bookmark can be any value + Until *InclusiveExclusiveCreateTXG + + // TODO + // Concurrent: uint > 0 +} + +type InclusiveExclusiveCreateTXG struct { + CreateTXG uint64 + Inclusive *zfs.NilBool // must not be nil +} + +// FS == nil XOR Filter == nil +type ListZFSHoldsAndBookmarksQueryFilesystemFilter struct { + FS *string + Filter zfs.DatasetFilter +} + +func (q *ListZFSHoldsAndBookmarksQuery) Validate() error { + if err := q.FS.Validate(); err != nil { + return errors.Wrap(err, "FS") + } + if q.JobID != nil { + q.JobID.MustValidate() // FIXME + } + if q.Until != nil { + if err := q.Until.Validate(); err != nil { + return errors.Wrap(err, "Until") + } + } + if err := q.What.Validate(); err != nil { + return err + } + return nil +} + +var zreplEndpointListAbstractionsQueryCreatetxg0Allowed = envconst.Bool("ZREPL_ENDPOINT_LIST_ABSTRACTIONS_QUERY_CREATETXG_0_ALLOWED", false) + +func (i *InclusiveExclusiveCreateTXG) Validate() error { + if err := i.Inclusive.Validate(); err != nil { + return errors.Wrap(err, "Inclusive") + } + if i.CreateTXG == 0 && !zreplEndpointListAbstractionsQueryCreatetxg0Allowed { + return errors.New("CreateTXG must be non-zero") + } + return nil + +} + +func (f *ListZFSHoldsAndBookmarksQueryFilesystemFilter) Validate() error { + if f == nil { + return nil + } + fsSet := f.FS != nil + filterSet := f.Filter != nil + if fsSet && filterSet || !fsSet && !filterSet { + return fmt.Errorf("must set FS or Filter field, but fsIsSet=%v and filterIsSet=%v", fsSet, filterSet) + } + if fsSet { + if err := zfs.EntityNamecheck(*f.FS, zfs.EntityTypeFilesystem); err != nil { + return errors.Wrap(err, "FS invalid") + } + } + return nil +} + +func (f *ListZFSHoldsAndBookmarksQueryFilesystemFilter) Filesystems(ctx context.Context) ([]string, error) { + if err := f.Validate(); err != nil { + panic(err) + } + if f.FS != nil { + return []string{*f.FS}, nil + } + if f.Filter != nil { + dps, err := zfs.ZFSListMapping(ctx, f.Filter) + if err != nil { + return nil, err + } + fss := make([]string, len(dps)) + for i, dp := range dps { + fss[i] = dp.ToString() + } + return fss, nil + } + panic("unreachable") +} + +type ListAbstractionsError struct { + FS string + Snap string + What string + Err error +} + +func (e ListAbstractionsError) Error() string { + if e.FS == "" { + return fmt.Sprintf("list endpoint abstractions: %s: %s", e.What, e.Err) + } else { + v := e.FS + if e.Snap != "" { + v = fmt.Sprintf("%s@%s", e.FS, e.Snap) + } + return fmt.Sprintf("list endpoint abstractions on %q: %s: %s", v, e.What, e.Err) + } +} + +type putListAbstractionErr func(err error, fs string, what string) +type putListAbstraction func(a Abstraction) + +type ListAbstractionsErrors []ListAbstractionsError + +func (e ListAbstractionsErrors) Error() string { + if len(e) == 0 { + panic(e) + } + if len(e) == 1 { + return fmt.Sprintf("list endpoint abstractions: %s", e[0]) + } + msgs := make([]string, len(e)) + for i := range e { + msgs[i] = e.Error() + } + return fmt.Sprintf("list endpoint abstractions: multiple errors:\n%s", strings.Join(msgs, "\n")) +} + +func ListAbstractions(ctx context.Context, query ListZFSHoldsAndBookmarksQuery) (out []Abstraction, outErrs []ListAbstractionsError, err error) { + // impl note: structure the query processing in such a way that + // a minimum amount of zfs shell-outs needs to be done + + if err := query.Validate(); err != nil { + return nil, nil, errors.Wrap(err, "validate query") + } + + fss, err := query.FS.Filesystems(ctx) + if err != nil { + return nil, nil, errors.Wrap(err, "list filesystems") + } + + errCb := func(err error, fs string, what string) { + outErrs = append(outErrs, ListAbstractionsError{Err: err, FS: fs, What: what}) + } + emitAbstraction := func(a Abstraction) { + jobIdMatches := query.JobID == nil || a.GetJobID() == nil || *a.GetJobID() == *query.JobID + + untilMatches := query.Until == nil + if query.Until != nil { + if query.Until.Inclusive.B { + untilMatches = a.GetCreateTXG() <= query.Until.CreateTXG + } else { + untilMatches = a.GetCreateTXG() < query.Until.CreateTXG + } + } + + if jobIdMatches && untilMatches { + out = append(out, a) + } + } + for _, fs := range fss { + listAbstractionsImplFS(ctx, fs, &query, emitAbstraction, errCb) + } + + return out, outErrs, nil + +} + +func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsAndBookmarksQuery, emitCandidate putListAbstraction, errCb putListAbstractionErr) { + fsp, err := zfs.NewDatasetPath(fs) + if err != nil { + panic(err) + } + + if len(query.What) == 0 { + return + } + + // we need filesystem versions for any abstraction type + fsvs, err := zfs.ZFSListFilesystemVersions(fsp, nil) + if err != nil { + errCb(err, fs, "list filesystem versions") + return + } + + for at := range query.What { + bmE := at.BookmarkExtractor() + holdE := at.HoldExtractor() + if bmE == nil && holdE == nil || bmE != nil && holdE != nil { + panic("implementation error: extractors misconfigured for " + at) + } + for _, v := range fsvs { + var a Abstraction + if v.Type == zfs.Bookmark && bmE != nil { + a = bmE(fsp, v) + } + if v.Type == zfs.Snapshot && holdE != nil { + holds, err := zfs.ZFSHolds(ctx, fsp.ToString(), v.Name) + if err != nil { + errCb(err, v.ToAbsPath(fsp), "get hold on snap") + continue + } + for _, tag := range holds { + a = holdE(fsp, v, tag) + } + } + if a != nil { + emitCandidate(a) + } + } + } +} + +type BatchDestroyResult struct { + Abstraction + DestroyErr error +} + +var _ json.Marshaler = (*BatchDestroyResult)(nil) + +func (r BatchDestroyResult) MarshalJSON() ([]byte, error) { + err := "" + if r.DestroyErr != nil { + err = r.DestroyErr.Error() + } + s := struct { + Abstraction AbstractionJSON + DestroyErr string + }{ + AbstractionJSON{r.Abstraction}, + err, + } + return json.Marshal(s) +} + +func BatchDestroy(ctx context.Context, abs []Abstraction) <-chan BatchDestroyResult { + // hold-based batching: per snapshot + // bookmark-based batching: none possible via CLI + // => not worth the trouble for now, will be worth it once we start using channel programs + // => TODO: actual batching using channel programs + res := make(chan BatchDestroyResult, len(abs)) + go func() { + for _, a := range abs { + res <- BatchDestroyResult{ + a, + a.Destroy(ctx), + } + } + close(res) + }() + return res +} + +type StalenessInfo struct { + ConstructedWithQuery ListZFSHoldsAndBookmarksQuery + All []Abstraction + Live []Abstraction + Stale []Abstraction +} + +func ListStale(ctx context.Context, q ListZFSHoldsAndBookmarksQuery) (*StalenessInfo, error) { + if q.Until != nil { + // we must determine the most recent step per FS, can't allow that + return nil, errors.New("ListStale cannot have Until != nil set on query") + } + + abs, absErr, err := ListAbstractions(ctx, q) + if err != nil { + return nil, err + } + if len(absErr) > 0 { + // can't go on here because we can't determine the most recent step + return nil, ListAbstractionsErrors(absErr) + } + si := listStaleFiltering(abs) + si.ConstructedWithQuery = q + return si, nil +} + +// The last AbstractionType.NumLive() step holds per (FS,Job,AbstractionType) are live +// others are stale. +// +// the returned StalenessInfo.ConstructedWithQuery is not set +func listStaleFiltering(abs []Abstraction) *StalenessInfo { + + type fsAjobAtype struct { + FS string + Job JobID + Type AbstractionType + } + var noJobId []Abstraction + by := make(map[fsAjobAtype][]Abstraction) + for _, a := range abs { + if a.GetJobID() == nil { + noJobId = append(noJobId, a) + continue + } + faj := fsAjobAtype{a.GetFS(), *a.GetJobID(), a.GetType()} + l := by[faj] + l = append(l, a) + by[faj] = l + } + + ret := &StalenessInfo{ + All: abs, + Live: noJobId, + Stale: []Abstraction{}, + } + + // sort descending (highest createtxg first), then cut off + for k := range by { + l := by[k] + sort.Slice(l, func(i, j int) bool { + return l[i].GetCreateTXG() > l[j].GetCreateTXG() + }) + + cutoff := k.Type.NumLivePerFsAndJob() + if cutoff == -1 || len(l) <= cutoff { + ret.Live = append(ret.Live, l...) + } else { + ret.Live = append(ret.Live, l[0:cutoff]...) + ret.Stale = append(ret.Stale, l[cutoff:]...) + } + } + + return ret + +} diff --git a/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go b/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go new file mode 100644 index 0000000..b4b42ba --- /dev/null +++ b/endpoint/endpoint_zfs_abstraction_cursor_and_last_received_hold.go @@ -0,0 +1,377 @@ +package endpoint + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + "sort" + + "github.com/kr/pretty" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/util/errorarray" + "github.com/zrepl/zrepl/zfs" +) + +const replicationCursorBookmarkNamePrefix = "zrepl_CURSOR" + +func ReplicationCursorBookmarkName(fs string, guid uint64, id JobID) (string, error) { + return replicationCursorBookmarkNameImpl(fs, guid, id.String()) +} + +func replicationCursorBookmarkNameImpl(fs string, guid uint64, jobid string) (string, error) { + return makeJobAndGuidBookmarkName(replicationCursorBookmarkNamePrefix, fs, guid, jobid) +} + +var ErrV1ReplicationCursor = fmt.Errorf("bookmark name is a v1-replication cursor") + +//err != nil always means that the bookmark is not a valid replication bookmark +// +// Returns ErrV1ReplicationCursor as error if the bookmark is a v1 replication cursor +func ParseReplicationCursorBookmarkName(fullname string) (uint64, JobID, error) { + + // check for legacy cursors + { + if err := zfs.EntityNamecheck(fullname, zfs.EntityTypeBookmark); err != nil { + return 0, JobID{}, errors.Wrap(err, "parse replication cursor bookmark name") + } + _, _, name, err := zfs.DecomposeVersionString(fullname) + if err != nil { + return 0, JobID{}, errors.Wrap(err, "parse replication cursor bookmark name: decompose version string") + } + const V1ReplicationCursorBookmarkName = "zrepl_replication_cursor" + if name == V1ReplicationCursorBookmarkName { + return 0, JobID{}, ErrV1ReplicationCursor + } + // fallthrough to main parser + } + + guid, jobID, err := parseJobAndGuidBookmarkName(fullname, replicationCursorBookmarkNamePrefix) + if err != nil { + err = errors.Wrap(err, "parse replication cursor bookmark name") // no shadow + } + return guid, jobID, err +} + +// may return nil for both values, indicating there is no cursor +func GetMostRecentReplicationCursorOfJob(ctx context.Context, fs string, jobID JobID) (*zfs.FilesystemVersion, error) { + fsp, err := zfs.NewDatasetPath(fs) + if err != nil { + return nil, err + } + candidates, err := GetReplicationCursors(ctx, fsp, jobID) + if err != nil || len(candidates) == 0 { + return nil, err + } + + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].CreateTXG < candidates[j].CreateTXG + }) + + mostRecent := candidates[len(candidates)-1] + return &mostRecent, nil +} + +func GetReplicationCursors(ctx context.Context, dp *zfs.DatasetPath, jobID JobID) ([]zfs.FilesystemVersion, error) { + + fs := dp.ToString() + q := ListZFSHoldsAndBookmarksQuery{ + FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{FS: &fs}, + What: map[AbstractionType]bool{ + AbstractionReplicationCursorBookmarkV1: true, + AbstractionReplicationCursorBookmarkV2: true, + }, + JobID: &jobID, + Until: nil, + } + abs, absErr, err := ListAbstractions(ctx, q) + if err != nil { + return nil, errors.Wrap(err, "get replication cursor: list bookmarks and holds") + } + if len(absErr) > 0 { + return nil, ListAbstractionsErrors(absErr) + } + + var v1, v2 []Abstraction + for _, a := range abs { + switch a.GetType() { + case AbstractionReplicationCursorBookmarkV1: + v1 = append(v1, a) + case AbstractionReplicationCursorBookmarkV2: + v2 = append(v2, a) + default: + panic("unexpected abstraction: " + a.GetType()) + } + } + + if len(v1) > 0 { + getLogger(ctx).WithField("bookmark", pretty.Sprint(v1)). + Warn("found v1-replication cursor bookmarks, consider running migration 'replication-cursor:v1-v2' after successful replication with this zrepl version") + } + + candidates := make([]zfs.FilesystemVersion, 0) + for _, v := range v2 { + candidates = append(candidates, v.GetFilesystemVersion()) + } + + return candidates, nil +} + +type ReplicationCursorTarget interface { + IsSnapshot() bool + GetGuid() uint64 + GetCreateTXG() uint64 + ToSendArgVersion() zfs.ZFSSendArgVersion +} + +// `target` is validated before replication cursor is set. if validation fails, the cursor is not moved. +// +// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS +func MoveReplicationCursor(ctx context.Context, fs string, target ReplicationCursorTarget, jobID JobID) (destroyedCursors []Abstraction, err error) { + + if !target.IsSnapshot() { + return nil, zfs.ErrBookmarkCloningNotSupported + } + + bookmarkname, err := ReplicationCursorBookmarkName(fs, target.GetGuid(), jobID) + if err != nil { + return nil, errors.Wrap(err, "determine replication cursor name") + } + + // idempotently create bookmark (guid is encoded in it, hence we'll most likely add a new one + // cleanup the old one afterwards + + err = zfs.ZFSBookmark(fs, target.ToSendArgVersion(), bookmarkname) + if err != nil { + if err == zfs.ErrBookmarkCloningNotSupported { + return nil, err // TODO go1.13 use wrapping + } + return nil, errors.Wrapf(err, "cannot create bookmark") + } + + destroyedCursors, err = DestroyObsoleteReplicationCursors(ctx, fs, target, jobID) + if err != nil { + return nil, errors.Wrap(err, "destroy obsolete replication cursors") + } + + return destroyedCursors, nil +} + +type ReplicationCursor interface { + GetCreateTXG() uint64 +} + +func DestroyObsoleteReplicationCursors(ctx context.Context, fs string, current ReplicationCursor, jobID JobID) (_ []Abstraction, err error) { + + q := ListZFSHoldsAndBookmarksQuery{ + FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &fs, + }, + What: AbstractionTypeSet{ + AbstractionReplicationCursorBookmarkV2: true, + }, + JobID: &jobID, + Until: &InclusiveExclusiveCreateTXG{ + CreateTXG: current.GetCreateTXG(), + Inclusive: &zfs.NilBool{B: false}, + }, + } + abs, absErr, err := ListAbstractions(ctx, q) + if err != nil { + return nil, errors.Wrap(err, "list abstractions") + } + if len(absErr) > 0 { + return nil, errors.Wrap(ListAbstractionsErrors(absErr), "list abstractions") + } + + var destroyed []Abstraction + var errs []error + for res := range BatchDestroy(ctx, abs) { + log := getLogger(ctx). + WithField("replication_cursor_bookmark", res.Abstraction) + if res.DestroyErr != nil { + errs = append(errs, res.DestroyErr) + log.WithError(err). + Error("cannot destroy obsolete replication cursor bookmark") + } else { + destroyed = append(destroyed, res.Abstraction) + log.Info("destroyed obsolete replication cursor bookmark") + } + } + if len(errs) == 0 { + return destroyed, nil + } else { + return destroyed, errorarray.Wrap(errs, "destroy obsolete replication cursor") + } +} + +var lastReceivedHoldTagRE = regexp.MustCompile("^zrepl_last_received_J_(.+)$") + +// err != nil always means that the bookmark is not a step bookmark +func ParseLastReceivedHoldTag(tag string) (JobID, error) { + match := lastReceivedHoldTagRE.FindStringSubmatch(tag) + if match == nil { + return JobID{}, errors.Errorf("parse last-received-hold tag: does not match regex %s", lastReceivedHoldTagRE.String()) + } + jobId, err := MakeJobID(match[1]) + if err != nil { + return JobID{}, errors.Wrap(err, "parse last-received-hold tag: invalid job id field") + } + return jobId, nil +} + +func LastReceivedHoldTag(jobID JobID) (string, error) { + return lastReceivedHoldImpl(jobID.String()) +} + +func lastReceivedHoldImpl(jobid string) (string, error) { + tag := fmt.Sprintf("zrepl_last_received_J_%s", jobid) + if err := zfs.ValidHoldTag(tag); err != nil { + return "", err + } + return tag, nil +} + +func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersion, jobID JobID) error { + + if to.IsSnapshot() { + return errors.Errorf("last-received-hold: target must be a snapshot: %s", to.FullPath(fs)) + } + + tag, err := LastReceivedHoldTag(jobID) + if err != nil { + return errors.Wrap(err, "last-received-hold: hold tag") + } + + // we never want to be without a hold + // => hold new one before releasing old hold + + err = zfs.ZFSHold(ctx, fs, to, tag) + if err != nil { + return errors.Wrap(err, "last-received-hold: hold newly received") + } + + q := ListZFSHoldsAndBookmarksQuery{ + What: AbstractionTypeSet{ + AbstractionLastReceivedHold: true, + }, + FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &fs, + }, + JobID: &jobID, + Until: &InclusiveExclusiveCreateTXG{ + CreateTXG: to.GetCreateTXG(), + Inclusive: &zfs.NilBool{B: false}, + }, + } + abs, absErrs, err := ListAbstractions(ctx, q) + if err != nil { + return errors.Wrap(err, "last-received-hold: list") + } + if len(absErrs) > 0 { + return errors.Wrap(ListAbstractionsErrors(absErrs), "last-received-hold: list") + } + + getLogger(ctx).WithField("last-received-holds", fmt.Sprintf("%s", abs)).Debug("releasing last-received-holds") + + var errs []error + for res := range BatchDestroy(ctx, abs) { + log := getLogger(ctx). + WithField("last-received-hold", res.Abstraction) + if res.DestroyErr != nil { + errs = append(errs, res.DestroyErr) + log.WithError(err). + Error("cannot release last-received-hold") + } else { + log.Info("released last-received-hold") + } + } + if len(errs) == 0 { + return nil + } else { + return errorarray.Wrap(errs, "last-received-hold: release") + } +} + +func ReplicationCursorV2Extractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction) { + if v.Type != zfs.Bookmark { + panic("impl error") + } + fullname := v.ToAbsPath(fs) + guid, jobid, err := ParseReplicationCursorBookmarkName(fullname) + if err == nil { + if guid != v.Guid { + // TODO log this possibly tinkered-with bookmark + return nil + } + return &ListHoldsAndBookmarksOutputBookmark{ + Type: AbstractionReplicationCursorBookmarkV2, + FS: fs.ToString(), + FilesystemVersion: v, + JobID: jobid, + } + } + return nil +} + +func ReplicationCursorV1Extractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction) { + if v.Type != zfs.Bookmark { + panic("impl error") + } + fullname := v.ToAbsPath(fs) + _, _, err := ParseReplicationCursorBookmarkName(fullname) + if err == ErrV1ReplicationCursor { + return &ReplicationCursorV1{ + Type: AbstractionReplicationCursorBookmarkV1, + FS: fs.ToString(), + FilesystemVersion: v, + } + } + return nil +} + +var _ HoldExtractor = LastReceivedHoldExtractor + +func LastReceivedHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction { + var err error + + if v.Type != zfs.Snapshot { + panic("impl error") + } + + jobID, err := ParseLastReceivedHoldTag(holdTag) + if err == nil { + return &ListHoldsAndBookmarksOutputHold{ + Type: AbstractionLastReceivedHold, + FS: fs.ToString(), + FilesystemVersion: v, + Tag: holdTag, + JobID: jobID, + } + } + return nil +} + +type ReplicationCursorV1 struct { + Type AbstractionType + FS string + zfs.FilesystemVersion +} + +func (c ReplicationCursorV1) GetType() AbstractionType { return c.Type } +func (c ReplicationCursorV1) GetFS() string { return c.FS } +func (c ReplicationCursorV1) GetFullPath() string { return fmt.Sprintf("%s#%s", c.FS, c.GetName()) } +func (c ReplicationCursorV1) GetJobID() *JobID { return nil } +func (c ReplicationCursorV1) GetFilesystemVersion() zfs.FilesystemVersion { return c.FilesystemVersion } +func (c ReplicationCursorV1) MarshalJSON() ([]byte, error) { + return json.Marshal(AbstractionJSON{c}) +} +func (c ReplicationCursorV1) String() string { + return fmt.Sprintf("%s %s", c.Type, c.GetFullPath()) +} +func (c ReplicationCursorV1) Destroy(ctx context.Context) error { + if err := zfs.ZFSDestroyIdempotent(c.GetFullPath()); err != nil { + return errors.Wrapf(err, "destroy %s %s: zfs", c.Type, c.GetFullPath()) + } + return nil +} diff --git a/endpoint/endpoint_zfs_abstraction_step.go b/endpoint/endpoint_zfs_abstraction_step.go new file mode 100644 index 0000000..82deb00 --- /dev/null +++ b/endpoint/endpoint_zfs_abstraction_step.go @@ -0,0 +1,266 @@ +package endpoint + +import ( + "context" + "fmt" + "regexp" + + "github.com/pkg/errors" + "github.com/zrepl/zrepl/util/errorarray" + "github.com/zrepl/zrepl/zfs" +) + +var stepHoldTagRE = regexp.MustCompile("^zrepl_STEP_J_(.+)") + +func StepHoldTag(jobid JobID) (string, error) { + return stepHoldTagImpl(jobid.String()) +} + +func stepHoldTagImpl(jobid string) (string, error) { + t := fmt.Sprintf("zrepl_STEP_J_%s", jobid) + if err := zfs.ValidHoldTag(t); err != nil { + return "", err + } + return t, nil +} + +// err != nil always means that the bookmark is not a step bookmark +func ParseStepHoldTag(tag string) (JobID, error) { + match := stepHoldTagRE.FindStringSubmatch(tag) + if match == nil { + return JobID{}, fmt.Errorf("parse hold tag: match regex %q", stepHoldTagRE) + } + jobID, err := MakeJobID(match[1]) + if err != nil { + return JobID{}, errors.Wrap(err, "parse hold tag: invalid job id field") + } + return jobID, nil +} + +const stepBookmarkNamePrefix = "zrepl_STEP" + +// v must be validated by caller +func StepBookmarkName(fs string, guid uint64, id JobID) (string, error) { + return stepBookmarkNameImpl(fs, guid, id.String()) +} + +func stepBookmarkNameImpl(fs string, guid uint64, jobid string) (string, error) { + return makeJobAndGuidBookmarkName(stepBookmarkNamePrefix, fs, guid, jobid) +} + +// name is the full bookmark name, including dataset path +// +// err != nil always means that the bookmark is not a step bookmark +func ParseStepBookmarkName(fullname string) (guid uint64, jobID JobID, err error) { + guid, jobID, err = parseJobAndGuidBookmarkName(fullname, stepBookmarkNamePrefix) + if err != nil { + err = errors.Wrap(err, "parse step bookmark name") // no shadow! + } + return guid, jobID, err +} + +// idempotently hold / step-bookmark `version` +// +// returns ErrBookmarkCloningNotSupported if version is a bookmark and bookmarking bookmarks is not supported by ZFS +func HoldStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) error { + if v.IsSnapshot() { + + tag, err := StepHoldTag(jobID) + if err != nil { + return errors.Wrap(err, "step hold tag") + } + + if err := zfs.ZFSHold(ctx, fs, v, tag); err != nil { + return errors.Wrap(err, "step hold: zfs") + } + + return nil + } + + if !v.IsBookmark() { + panic(fmt.Sprintf("version must bei either snapshot or bookmark, got %#v", v)) + } + + bmname, err := StepBookmarkName(fs, v.Guid, jobID) + if err != nil { + return errors.Wrap(err, "create step bookmark: determine bookmark name") + } + // idempotently create bookmark + err = zfs.ZFSBookmark(fs, v.ToSendArgVersion(), bmname) + if err != nil { + if err == zfs.ErrBookmarkCloningNotSupported { + // TODO we could actually try to find a local snapshot that has the requested GUID + // however, the replication algorithm prefers snapshots anyways, so this quest + // is most likely not going to be successful. Also, there's the possibility that + // the caller might want to filter what snapshots are eligibile, and this would + // complicate things even further. + return err // TODO go1.13 use wrapping + } + return errors.Wrap(err, "create step bookmark: zfs") + } + return nil +} + +// idempotently release the step-hold on v if v is a snapshot +// or idempotently destroy the step-bookmark of v if v is a bookmark +// +// note that this operation leaves v itself untouched, unless v is the step-bookmark itself, in which case v is destroyed +// +// returns an instance of *zfs.DatasetDoesNotExist if `v` does not exist +func ReleaseStep(ctx context.Context, fs string, v zfs.FilesystemVersion, jobID JobID) error { + + if v.IsSnapshot() { + tag, err := StepHoldTag(jobID) + if err != nil { + return errors.Wrap(err, "step release tag") + } + + if err := zfs.ZFSRelease(ctx, tag, v.FullPath(fs)); err != nil { + return errors.Wrap(err, "step release: zfs") + } + + return nil + } + if !v.IsBookmark() { + panic(fmt.Sprintf("impl error: expecting version to be a bookmark, got %#v", v)) + } + + bmname, err := StepBookmarkName(fs, v.Guid, jobID) + if err != nil { + return errors.Wrap(err, "step release: determine bookmark name") + } + // idempotently destroy bookmark + + if err := zfs.ZFSDestroyIdempotent(bmname); err != nil { + return errors.Wrap(err, "step release: bookmark destroy: zfs") + } + + return nil +} + +// release {step holds, step bookmarks} earlier and including `mostRecent` +func ReleaseStepCummulativeInclusive(ctx context.Context, fs string, mostRecent zfs.FilesystemVersion, jobID JobID) error { + q := ListZFSHoldsAndBookmarksQuery{ + What: AbstractionTypeSet{ + AbstractionStepHold: true, + AbstractionStepBookmark: true, + }, + FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &fs, + }, + JobID: &jobID, + Until: &InclusiveExclusiveCreateTXG{ + CreateTXG: mostRecent.CreateTXG, + Inclusive: &zfs.NilBool{B: true}, + }, + } + abs, absErrs, err := ListAbstractions(ctx, q) + if err != nil { + return errors.Wrap(err, "step release cummulative: list") + } + if len(absErrs) > 0 { + return errors.Wrap(ListAbstractionsErrors(absErrs), "step release cummulative: list") + } + + getLogger(ctx).WithField("step_holds_and_bookmarks", fmt.Sprintf("%s", abs)).Debug("releasing step holds and bookmarks") + + var errs []error + for res := range BatchDestroy(ctx, abs) { + log := getLogger(ctx). + WithField("step_hold_or_bookmark", res.Abstraction) + if res.DestroyErr != nil { + errs = append(errs, res.DestroyErr) + log.WithError(err). + Error("cannot release step hold or bookmark") + } else { + log.Info("released step hold or bookmark") + } + } + if len(errs) == 0 { + return nil + } else { + return errorarray.Wrap(errs, "step release cummulative: release") + } +} + +func TryReleaseStepStaleFS(ctx context.Context, fs string, jobID JobID) { + + q := ListZFSHoldsAndBookmarksQuery{ + FS: ListZFSHoldsAndBookmarksQueryFilesystemFilter{ + FS: &fs, + }, + JobID: &jobID, + What: AbstractionTypeSet{ + AbstractionStepHold: true, + AbstractionStepBookmark: true, + }, + } + staleness, err := ListStale(ctx, q) + if err != nil { + getLogger(ctx).WithError(err).Error("cannot list stale step holds and bookmarks") + return + } + for _, s := range staleness.Stale { + getLogger(ctx).WithField("stale_step_hold_or_bookmark", s).Info("batch-destroying stale step hold or bookmark") + } + for res := range BatchDestroy(ctx, staleness.Stale) { + if res.DestroyErr != nil { + getLogger(ctx). + WithField("stale_step_hold_or_bookmark", res.Abstraction). + WithError(res.DestroyErr). + Error("cannot destroy stale step-hold or bookmark") + } else { + getLogger(ctx). + WithField("stale_step_hold_or_bookmark", res.Abstraction). + WithError(res.DestroyErr). + Info("destroyed stale step-hold or bookmark") + } + } + +} + +var _ BookmarkExtractor = StepBookmarkExtractor + +func StepBookmarkExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion) (_ Abstraction) { + if v.Type != zfs.Bookmark { + panic("impl error") + } + + fullname := v.ToAbsPath(fs) + + guid, jobid, err := ParseStepBookmarkName(fullname) + if guid != v.Guid { + // TODO log this possibly tinkered-with bookmark + return nil + } + if err == nil { + bm := &ListHoldsAndBookmarksOutputBookmark{ + Type: AbstractionStepBookmark, + FS: fs.ToString(), + FilesystemVersion: v, + JobID: jobid, + } + return bm + } + return nil +} + +var _ HoldExtractor = StepHoldExtractor + +func StepHoldExtractor(fs *zfs.DatasetPath, v zfs.FilesystemVersion, holdTag string) Abstraction { + if v.Type != zfs.Snapshot { + panic("impl error") + } + + jobID, err := ParseStepHoldTag(holdTag) + if err == nil { + return &ListHoldsAndBookmarksOutputHold{ + Type: AbstractionStepHold, + FS: fs.ToString(), + Tag: holdTag, + FilesystemVersion: v, + JobID: jobID, + } + } + return nil +} diff --git a/endpoint/endpoint_zfs_helpers.go b/endpoint/endpoint_zfs_helpers_repr.go similarity index 52% rename from endpoint/endpoint_zfs_helpers.go rename to endpoint/endpoint_zfs_helpers_repr.go index 321b5d6..50dc577 100644 --- a/endpoint/endpoint_zfs_helpers.go +++ b/endpoint/endpoint_zfs_helpers_repr.go @@ -1,7 +1,6 @@ package endpoint import ( - "context" "fmt" "regexp" "strconv" @@ -57,53 +56,3 @@ func parseJobAndGuidBookmarkName(fullname string, prefix string) (guid uint64, j return guid, jobID, nil } - -func destroyBookmarksOlderThan(ctx context.Context, fs string, mostRecent *zfs.ZFSSendArgVersion, jobID JobID, filter func(shortname string) (accept bool)) (destroyed []zfs.FilesystemVersion, err error) { - if filter == nil { - panic(filter) - } - - fsp, err := zfs.NewDatasetPath(fs) - if err != nil { - return nil, errors.Wrap(err, "invalid filesystem path") - } - - mostRecentProps, err := mostRecent.ValidateExistsAndGetCheckedProps(ctx, fs) - if err != nil { - return nil, errors.Wrap(err, "validate most recent version argument") - } - - stepBookmarks, err := zfs.ZFSListFilesystemVersions(fsp, zfs.FilterFromClosure( - func(t zfs.VersionType, name string) (accept bool, err error) { - if t != zfs.Bookmark { - return false, nil - } - return filter(name), nil - })) - if err != nil { - return nil, errors.Wrap(err, "list bookmarks") - } - - // cut off all bookmarks prior to mostRecent's CreateTXG - var destroy []zfs.FilesystemVersion - for _, v := range stepBookmarks { - if v.Type != zfs.Bookmark { - panic("implementation error") - } - if !filter(v.Name) { - panic("inconsistent filter result") - } - if v.CreateTXG < mostRecentProps.CreateTXG { - destroy = append(destroy, v) - } - } - - // FIXME use batch destroy, must adopt code to handle bookmarks - for _, v := range destroy { - if err := zfs.ZFSDestroyIdempotent(v.ToAbsPath(fsp)); err != nil { - return nil, errors.Wrap(err, "destroy bookmark") - } - } - - return destroy, nil -} diff --git a/endpoint/endpoint_zfs_helpers_test.go b/endpoint/endpoint_zfs_helpers_repr_test.go similarity index 100% rename from endpoint/endpoint_zfs_helpers_test.go rename to endpoint/endpoint_zfs_helpers_repr_test.go diff --git a/endpoint/endpoint_zfs_helpers_types.go b/endpoint/endpoint_zfs_helpers_types.go new file mode 100644 index 0000000..6e0a0cb --- /dev/null +++ b/endpoint/endpoint_zfs_helpers_types.go @@ -0,0 +1,73 @@ +package endpoint + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/pkg/errors" + "github.com/zrepl/zrepl/zfs" +) + +type ListHoldsAndBookmarksOutputBookmark struct { + Type AbstractionType + FS string + zfs.FilesystemVersion + JobID JobID +} + +func (b ListHoldsAndBookmarksOutputBookmark) GetType() AbstractionType { return b.Type } +func (b ListHoldsAndBookmarksOutputBookmark) GetFS() string { return b.FS } +func (b ListHoldsAndBookmarksOutputBookmark) GetJobID() *JobID { return &b.JobID } +func (b ListHoldsAndBookmarksOutputBookmark) GetFullPath() string { + return fmt.Sprintf("%s#%s", b.FS, b.Name) // TODO use zfs.FilesystemVersion.ToAbsPath +} +func (b ListHoldsAndBookmarksOutputBookmark) MarshalJSON() ([]byte, error) { + return json.Marshal(AbstractionJSON{b}) +} +func (b ListHoldsAndBookmarksOutputBookmark) String() string { + return fmt.Sprintf("%s %s", b.Type, b.GetFullPath()) +} + +func (b ListHoldsAndBookmarksOutputBookmark) GetFilesystemVersion() zfs.FilesystemVersion { + return b.FilesystemVersion +} + +func (b ListHoldsAndBookmarksOutputBookmark) Destroy(ctx context.Context) error { + if err := zfs.ZFSDestroyIdempotent(b.GetFullPath()); err != nil { + return errors.Wrapf(err, "destroy %s: zfs", b) + } + return nil +} + +type ListHoldsAndBookmarksOutputHold struct { + Type AbstractionType + FS string + zfs.FilesystemVersion + Tag string + JobID JobID +} + +func (h ListHoldsAndBookmarksOutputHold) GetType() AbstractionType { return h.Type } +func (h ListHoldsAndBookmarksOutputHold) GetFS() string { return h.FS } +func (h ListHoldsAndBookmarksOutputHold) GetJobID() *JobID { return &h.JobID } +func (h ListHoldsAndBookmarksOutputHold) GetFullPath() string { + return fmt.Sprintf("%s@%s", h.FS, h.GetName()) // TODO use zfs.FilesystemVersion.ToAbsPath +} +func (h ListHoldsAndBookmarksOutputHold) MarshalJSON() ([]byte, error) { + return json.Marshal(AbstractionJSON{h}) +} +func (h ListHoldsAndBookmarksOutputHold) String() string { + return fmt.Sprintf("%s %q on %s", h.Type, h.Tag, h.GetFullPath()) +} + +func (h ListHoldsAndBookmarksOutputHold) GetFilesystemVersion() zfs.FilesystemVersion { + return h.FilesystemVersion +} + +func (h ListHoldsAndBookmarksOutputHold) Destroy(ctx context.Context) error { + if err := zfs.ZFSRelease(ctx, h.Tag, h.GetFullPath()); err != nil { + return errors.Wrapf(err, "release %s: zfs", h) + } + return nil +} diff --git a/go.mod b/go.mod index 2bbbea1..b90cb03 100644 --- a/go.mod +++ b/go.mod @@ -33,5 +33,6 @@ require ( golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 + gonum.org/v1/gonum v0.7.0 // indirect google.golang.org/grpc v1.17.0 ) diff --git a/go.sum b/go.sum index 4b55d9a..8fe0f61 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,7 @@ github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q github.com/OpenPeeDeeP/depguard v0.0.0-20180806142446-a69c782687b2/go.mod h1:7/4sitnI9YlQgTLLk734QlzXT8DuHVnAyztLplQjk+o= github.com/OpenPeeDeeP/depguard v0.0.0-20181229194401-1f388ab2d810/go.mod h1:7/4sitnI9YlQgTLLk734QlzXT8DuHVnAyztLplQjk+o= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -28,6 +29,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/fatih/color v1.6.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ftrvxmtrx/fd v0.0.0-20150925145434-c6d800382fff h1:zk1wwii7uXmI0znwU+lqg+wFL9G5+vm5I+9rv2let60= @@ -70,6 +72,7 @@ github.com/go-toolsmith/typep v1.0.0/go.mod h1:JSQCQMUPdRlMZFswiq3TGpNp1GMktqkR2 github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= @@ -122,6 +125,7 @@ github.com/jinzhu/copier v0.0.0-20170922082739-db4671f3a9b8/go.mod h1:yL958EeXv8 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= @@ -289,6 +293,11 @@ golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2 h1:y102fOLFqhV41b+4GPiJoa0k/x+pJcEi2/HB1Y5T6fU= +golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20170915142106-8351a756f30f/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -328,17 +337,24 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20170915040203-e531a2a1c15f/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181117154741-2ddaf7f79a09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181205014116-22934f0fdb62/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190110163146-51295c7ec13a/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190121143147-24cd39ecf745/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190213192042-740235f6c0d8/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190311215038-5c2858a9cfe5/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190322203728-c1a832b0ad89/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190521203540-521d6ed310dd/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524210228-3d17549cdc6b/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/gonum v0.7.0 h1:Hdks0L0hgznZLG9nzXb8vZ0rRvqNvAcgAp84y7Mwkgw= +gonum.org/v1/gonum v0.7.0/go.mod h1:L02bwd0sqlsvRv41G7wGWFCsVNZFv/k1xzGIxeANHGM= +gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= +gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= @@ -350,6 +366,7 @@ google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -368,5 +385,6 @@ mvdan.cc/interfacer v0.0.0-20180901003855-c20040233aed/go.mod h1:Xkxe497xwlCKkIa mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b/go.mod h1:2odslEg/xrtNQqCYg2/jCoyKnw3vv5biOc3JnIcYfL4= mvdan.cc/unparam v0.0.0-20190209190245-fbb59629db34/go.mod h1:H6SUd1XjIs+qQCyskXg5OFSrilMRUkD8ePJpHKDPaeY= mvdan.cc/unparam v0.0.0-20190310220240-1b9ccfa71afe/go.mod h1:BnhuWBAqxH3+J5bDybdxgw5ZfS+DsVd4iylsKQePN8o= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0= sourcegraph.com/sqs/pbtypes v1.0.0/go.mod h1:3AciMUv4qUuRHRHhOG4TZOB+72GdPVz5k+c648qsFS4= diff --git a/platformtest/tests/batchRelease.go b/platformtest/tests/batchRelease.go deleted file mode 100644 index 7884120..0000000 --- a/platformtest/tests/batchRelease.go +++ /dev/null @@ -1,154 +0,0 @@ -package tests - -import ( - "fmt" - - "github.com/stretchr/testify/require" - - "github.com/zrepl/zrepl/platformtest" - "github.com/zrepl/zrepl/zfs" -) - -type rollupReleaseExpectTags struct { - Snap string - Holds map[string]bool -} - -func rollupReleaseTest(ctx *platformtest.Context, cb func(fs string) []rollupReleaseExpectTags) { - - platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, ` - DESTROYROOT - CREATEROOT - + "foo bar" - + "foo bar@1" - + "foo bar@2" - + "foo bar@3" - + "foo bar@4" - + "foo bar@5" - + "foo bar@6" - R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@1" - R zfs hold zrepl_platformtest_2 "${ROOTDS}/foo bar@2" - R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@3" - R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@5" - R zfs hold zrepl_platformtest "${ROOTDS}/foo bar@6" - R zfs bookmark "${ROOTDS}/foo bar@5" "${ROOTDS}/foo bar#5" -`) - - fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset) - - expTags := cb(fs) - - for _, exp := range expTags { - holds, err := zfs.ZFSHolds(ctx, fs, exp.Snap) - if err != nil { - panic(err) - } - for _, h := range holds { - if e, ok := exp.Holds[h]; !ok || !e { - panic(fmt.Sprintf("tag %q on snap %q not expected", h, exp.Snap)) - } - } - } - -} - -func RollupReleaseIncluding(ctx *platformtest.Context) { - rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags { - guid5, err := zfs.ZFSGetGUID(fs, "@5") - require.NoError(ctx, err) - - err = zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, guid5, "zrepl_platformtest") - require.NoError(ctx, err) - - return []rollupReleaseExpectTags{ - {"1", map[string]bool{}}, - {"2", map[string]bool{"zrepl_platformtest_2": true}}, - {"3", map[string]bool{}}, - {"4", map[string]bool{}}, - {"5", map[string]bool{}}, - {"6", map[string]bool{"zrepl_platformtest": true}}, - } - }) -} - -func RollupReleaseExcluding(ctx *platformtest.Context) { - rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags { - guid5, err := zfs.ZFSGetGUID(fs, "@5") - require.NoError(ctx, err) - - err = zfs.ZFSReleaseAllOlderThanGUID(ctx, fs, guid5, "zrepl_platformtest") - require.NoError(ctx, err) - - return []rollupReleaseExpectTags{ - {"1", map[string]bool{}}, - {"2", map[string]bool{"zrepl_platformtest_2": true}}, - {"3", map[string]bool{}}, - {"4", map[string]bool{}}, - {"5", map[string]bool{"zrepl_platformtest": true}}, - {"6", map[string]bool{"zrepl_platformtest": true}}, - } - }) -} - -func RollupReleaseMostRecentIsBookmarkWithoutSnapshot(ctx *platformtest.Context) { - rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags { - guid5, err := zfs.ZFSGetGUID(fs, "#5") - require.NoError(ctx, err) - - err = zfs.ZFSRelease(ctx, "zrepl_platformtest", fs+"@5") - require.NoError(ctx, err) - - err = zfs.ZFSDestroy(fs + "@5") - require.NoError(ctx, err) - - err = zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, guid5, "zrepl_platformtest") - require.NoError(ctx, err) - - return []rollupReleaseExpectTags{ - {"1", map[string]bool{}}, - {"2", map[string]bool{"zrepl_platformtest_2": true}}, - {"3", map[string]bool{}}, - {"4", map[string]bool{}}, - // {"5", map[string]bool{}}, doesn't exist - {"6", map[string]bool{"zrepl_platformtest": true}}, - } - }) -} - -func RollupReleaseMostRecentIsBookmarkAndSnapshotStillExists(ctx *platformtest.Context) { - rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags { - guid5, err := zfs.ZFSGetGUID(fs, "#5") - require.NoError(ctx, err) - - err = zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, guid5, "zrepl_platformtest") - require.NoError(ctx, err) - - return []rollupReleaseExpectTags{ - {"1", map[string]bool{}}, - {"2", map[string]bool{"zrepl_platformtest_2": true}}, - {"3", map[string]bool{}}, - {"4", map[string]bool{}}, - {"5", map[string]bool{}}, - {"6", map[string]bool{"zrepl_platformtest": true}}, - } - }) -} - -func RollupReleaseMostRecentDoesntExist(ctx *platformtest.Context) { - rollupReleaseTest(ctx, func(fs string) []rollupReleaseExpectTags { - - const nonexistentGuid = 0 // let's take our chances... - err := zfs.ZFSReleaseAllOlderAndIncludingGUID(ctx, fs, nonexistentGuid, "zrepl_platformtest") - require.Error(ctx, err) - require.Contains(ctx, err.Error(), "cannot find snapshot or bookmark with guid 0") - - return []rollupReleaseExpectTags{ - {"1", map[string]bool{"zrepl_platformtest": true}}, - {"2", map[string]bool{"zrepl_platformtest_2": true}}, - {"3", map[string]bool{"zrepl_platformtest": true}}, - {"4", map[string]bool{"zrepl_platformtest": true}}, - {"5", map[string]bool{"zrepl_platformtest": true}}, - {"6", map[string]bool{"zrepl_platformtest": true}}, - } - }) -} diff --git a/platformtest/tests/helpers.go b/platformtest/tests/helpers.go index d296194..0665672 100644 --- a/platformtest/tests/helpers.go +++ b/platformtest/tests/helpers.go @@ -25,6 +25,14 @@ func sendArgVersion(fs, relName string) zfs.ZFSSendArgVersion { } } +func fsversion(fs, relname string) zfs.FilesystemVersion { + v, err := zfs.ZFSGetFilesystemVersion(fs + relname) + if err != nil { + panic(err) + } + return v +} + func mustDatasetPath(fs string) *zfs.DatasetPath { p, err := zfs.NewDatasetPath(fs) if err != nil { @@ -47,8 +55,8 @@ func mustSnapshot(snap string) { } } -func mustGetProps(entity string) zfs.ZFSPropCreateTxgAndGuidProps { - props, err := zfs.ZFSGetCreateTXGAndGuid(entity) +func mustGetFilesystemVersion(snapOrBookmark string) zfs.FilesystemVersion { + props, err := zfs.ZFSGetFilesystemVersion(snapOrBookmark) check(err) return props } @@ -78,7 +86,7 @@ type dummySnapshotSituation struct { } type resumeSituation struct { - sendArgs zfs.ZFSSendArgs + sendArgs zfs.ZFSSendArgsUnvalidated recvOpts zfs.RecvOptions sendErr, recvErr error recvErrDecoded *zfs.RecvFailedWithResumeTokenErr @@ -107,7 +115,7 @@ func makeDummyDataSnapshots(ctx *platformtest.Context, sendFS string) (situation return situation } -func makeResumeSituation(ctx *platformtest.Context, src dummySnapshotSituation, recvFS string, sendArgs zfs.ZFSSendArgs, recvOptions zfs.RecvOptions) *resumeSituation { +func makeResumeSituation(ctx *platformtest.Context, src dummySnapshotSituation, recvFS string, sendArgs zfs.ZFSSendArgsUnvalidated, recvOptions zfs.RecvOptions) *resumeSituation { situation := &resumeSituation{} @@ -115,8 +123,13 @@ func makeResumeSituation(ctx *platformtest.Context, src dummySnapshotSituation, situation.recvOpts = recvOptions require.True(ctx, recvOptions.SavePartialRecvState, "this method would be pointless otherwise") require.Equal(ctx, sendArgs.FS, src.sendFS) + sendArgsValidated, err := sendArgs.Validate(ctx) + situation.sendErr = err + if err != nil { + return situation + } - copier, err := zfs.ZFSSend(ctx, sendArgs) + copier, err := zfs.ZFSSend(ctx, sendArgsValidated) situation.sendErr = err if err != nil { return situation diff --git a/platformtest/tests/idempotentHold.go b/platformtest/tests/idempotentHold.go index 575cb4f..d53b55d 100644 --- a/platformtest/tests/idempotentHold.go +++ b/platformtest/tests/idempotentHold.go @@ -22,7 +22,7 @@ func IdempotentHold(ctx *platformtest.Context) { `) fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset) - v1 := sendArgVersion(fs, "@1") + v1 := fsversion(fs, "@1") tag := "zrepl_platformtest" err := zfs.ZFSHold(ctx, fs, v1, tag) @@ -34,14 +34,4 @@ func IdempotentHold(ctx *platformtest.Context) { if err != nil { panic(err) } - - vnonexistent := zfs.ZFSSendArgVersion{ - RelName: "@nonexistent", - GUID: 0xbadf00d, - } - err = zfs.ZFSHold(ctx, fs, vnonexistent, tag) - if err == nil { - panic("still expecting error for nonexistent snapshot") - } - } diff --git a/platformtest/tests/replicationCursor.go b/platformtest/tests/replicationCursor.go index 768e32b..d589dc8 100644 --- a/platformtest/tests/replicationCursor.go +++ b/platformtest/tests/replicationCursor.go @@ -31,7 +31,7 @@ func ReplicationCursor(ctx *platformtest.Context) { } fs := ds.ToString() - snap := sendArgVersion(fs, "@1 with space") + snap := fsversion(fs, "@1 with space") destroyed, err := endpoint.MoveReplicationCursor(ctx, fs, &snap, jobid) if err != nil { @@ -39,7 +39,7 @@ func ReplicationCursor(ctx *platformtest.Context) { } assert.Empty(ctx, destroyed) - snapProps, err := zfs.ZFSGetCreateTXGAndGuid(snap.FullPath(fs)) + snapProps, err := zfs.ZFSGetFilesystemVersion(snap.FullPath(fs)) if err != nil { panic(err) } @@ -56,13 +56,13 @@ func ReplicationCursor(ctx *platformtest.Context) { } // try moving - cursor1BookmarkName, err := endpoint.ReplicationCursorBookmarkName(fs, snap.GUID, jobid) + cursor1BookmarkName, err := endpoint.ReplicationCursorBookmarkName(fs, snap.Guid, jobid) require.NoError(ctx, err) - snap2 := sendArgVersion(fs, "@2 with space") + snap2 := fsversion(fs, "@2 with space") destroyed, err = endpoint.MoveReplicationCursor(ctx, fs, &snap2, jobid) require.NoError(ctx, err) require.Equal(ctx, 1, len(destroyed)) - require.Equal(ctx, zfs.Bookmark, destroyed[0].Type) - require.Equal(ctx, cursor1BookmarkName, destroyed[0].Name) + require.Equal(ctx, endpoint.AbstractionReplicationCursorBookmarkV2, destroyed[0].GetType()) + require.Equal(ctx, cursor1BookmarkName, destroyed[0].GetName()) } diff --git a/platformtest/tests/resumableRecvAndTokenHandling.go b/platformtest/tests/resumableRecvAndTokenHandling.go index 96586ad..704f3ab 100644 --- a/platformtest/tests/resumableRecvAndTokenHandling.go +++ b/platformtest/tests/resumableRecvAndTokenHandling.go @@ -25,7 +25,7 @@ func ResumableRecvAndTokenHandling(ctx *platformtest.Context) { src := makeDummyDataSnapshots(ctx, sendFS) - s := makeResumeSituation(ctx, src, recvFS, zfs.ZFSSendArgs{ + s := makeResumeSituation(ctx, src, recvFS, zfs.ZFSSendArgsUnvalidated{ FS: sendFS, To: src.snapA, Encrypted: &zfs.NilBool{B: false}, diff --git a/platformtest/tests/sendArgsValidation.go b/platformtest/tests/sendArgsValidation.go index ceb388a..da6858e 100644 --- a/platformtest/tests/sendArgsValidation.go +++ b/platformtest/tests/sendArgsValidation.go @@ -23,9 +23,9 @@ func SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden(ctx *platformt `) fs := fmt.Sprintf("%s/send er", ctx.RootDataset) - props := mustGetProps(fs + "@a snap") + props := mustGetFilesystemVersion(fs + "@a snap") - sendArgs := zfs.ZFSSendArgs{ + sendArgs, err := zfs.ZFSSendArgsUnvalidated{ FS: fs, To: &zfs.ZFSSendArgVersion{ RelName: "@a snap", @@ -33,10 +33,15 @@ func SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden(ctx *platformt }, Encrypted: &zfs.NilBool{B: true}, ResumeToken: "", - } - stream, err := zfs.ZFSSend(ctx, sendArgs) + }.Validate(ctx) + + var stream *zfs.ReadCloserCopier if err == nil { - defer stream.Close() + stream, err = zfs.ZFSSend(ctx, sendArgs) // no shadow + if err == nil { + defer stream.Close() + } + // fallthrough } if expectNotSupportedErr { @@ -76,7 +81,7 @@ func SendArgsValidationResumeTokenEncryptionMismatchForbidden(ctx *platformtest. src := makeDummyDataSnapshots(ctx, sendFS) - unencS := makeResumeSituation(ctx, src, unencRecvFS, zfs.ZFSSendArgs{ + unencS := makeResumeSituation(ctx, src, unencRecvFS, zfs.ZFSSendArgsUnvalidated{ FS: sendFS, To: src.snapA, Encrypted: &zfs.NilBool{B: false}, // ! @@ -85,7 +90,7 @@ func SendArgsValidationResumeTokenEncryptionMismatchForbidden(ctx *platformtest. SavePartialRecvState: true, }) - encS := makeResumeSituation(ctx, src, encRecvFS, zfs.ZFSSendArgs{ + encS := makeResumeSituation(ctx, src, encRecvFS, zfs.ZFSSendArgsUnvalidated{ FS: sendFS, To: src.snapA, Encrypted: &zfs.NilBool{B: true}, // ! @@ -97,16 +102,10 @@ func SendArgsValidationResumeTokenEncryptionMismatchForbidden(ctx *platformtest. // threat model: use of a crafted resume token that requests an unencrypted send // but send args require encrypted send { - var maliciousSend zfs.ZFSSendArgs = encS.sendArgs + var maliciousSend zfs.ZFSSendArgsUnvalidated = encS.sendArgs maliciousSend.ResumeToken = unencS.recvErrDecoded.ResumeTokenRaw - stream, err := zfs.ZFSSend(ctx, maliciousSend) - if err == nil { - defer stream.Close() - } - require.Nil(ctx, stream) - require.Error(ctx, err) - ctx.Logf("send err: %T %s", err, err) + _, err := maliciousSend.Validate(ctx) validationErr, ok := err.(*zfs.ZFSSendArgsValidationError) require.True(ctx, ok) require.Equal(ctx, validationErr.What, zfs.ZFSSendArgsResumeTokenMismatch) @@ -120,14 +119,10 @@ func SendArgsValidationResumeTokenEncryptionMismatchForbidden(ctx *platformtest. // threat model: use of a crafted resume token that requests an encrypted send // but send args require unencrypted send { - var maliciousSend zfs.ZFSSendArgs = unencS.sendArgs + var maliciousSend zfs.ZFSSendArgsUnvalidated = unencS.sendArgs maliciousSend.ResumeToken = encS.recvErrDecoded.ResumeTokenRaw - stream, err := zfs.ZFSSend(ctx, maliciousSend) - if err == nil { - defer stream.Close() - } - require.Nil(ctx, stream) + _, err := maliciousSend.Validate(ctx) require.Error(ctx, err) ctx.Logf("send err: %T %s", err, err) validationErr, ok := err.(*zfs.ZFSSendArgsValidationError) @@ -169,7 +164,7 @@ func SendArgsValidationResumeTokenDifferentFilesystemForbidden(ctx *platformtest src1 := makeDummyDataSnapshots(ctx, sendFS1) src2 := makeDummyDataSnapshots(ctx, sendFS2) - rs := makeResumeSituation(ctx, src1, recvFS, zfs.ZFSSendArgs{ + rs := makeResumeSituation(ctx, src1, recvFS, zfs.ZFSSendArgsUnvalidated{ FS: sendFS1, To: src1.snapA, Encrypted: &zfs.NilBool{B: false}, @@ -180,7 +175,7 @@ func SendArgsValidationResumeTokenDifferentFilesystemForbidden(ctx *platformtest // threat model: forged resume token tries to steal a full send of snapA on fs2 by // presenting a resume token for full send of snapA on fs1 - var maliciousSend zfs.ZFSSendArgs = zfs.ZFSSendArgs{ + var maliciousSend zfs.ZFSSendArgsUnvalidated = zfs.ZFSSendArgsUnvalidated{ FS: sendFS2, To: &zfs.ZFSSendArgVersion{ RelName: src2.snapA.RelName, @@ -189,12 +184,7 @@ func SendArgsValidationResumeTokenDifferentFilesystemForbidden(ctx *platformtest Encrypted: &zfs.NilBool{B: false}, ResumeToken: rs.recvErrDecoded.ResumeTokenRaw, } - - stream, err := zfs.ZFSSend(ctx, maliciousSend) - if err == nil { - defer stream.Close() - } - require.Nil(ctx, stream) + _, err = maliciousSend.Validate(ctx) require.Error(ctx, err) ctx.Logf("send err: %T %s", err, err) validationErr, ok := err.(*zfs.ZFSSendArgsValidationError) diff --git a/platformtest/tests/tests.go b/platformtest/tests/tests.go index 0853399..aacdb8e 100644 --- a/platformtest/tests/tests.go +++ b/platformtest/tests/tests.go @@ -18,11 +18,6 @@ var Cases = []Case{ UndestroyableSnapshotParsing, GetNonexistent, ReplicationCursor, - RollupReleaseIncluding, - RollupReleaseExcluding, - RollupReleaseMostRecentIsBookmarkWithoutSnapshot, - RollupReleaseMostRecentIsBookmarkAndSnapshotStillExists, - RollupReleaseMostRecentDoesntExist, IdempotentHold, IdempotentBookmark, IdempotentDestroy, diff --git a/replication/logic/replication_logic.go b/replication/logic/replication_logic.go index b50a892..689fd54 100644 --- a/replication/logic/replication_logic.go +++ b/replication/logic/replication_logic.go @@ -351,18 +351,19 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) { log.WithField("token", resumeToken).Debug("decode resume token") } - // give both sides a hint about how far the replication got - // This serves as a cumulative variant of SendCompleted and can be useful + // give both sides a hint about how far prior replication attempts got + // This serves as a cummulative variant of SendCompleted and can be useful // for example to release stale holds from an earlier (interrupted) replication. // TODO FIXME: enqueue this as a replication step instead of doing it here during planning // then again, the step should run regardless of planning success // so maybe a separate phase before PLANNING, then? path, conflict := IncrementalPath(rfsvs, sfsvs) - var sender_mrca *pdu.FilesystemVersion // from sfsvs + var sender_mrca *pdu.FilesystemVersion if conflict == nil && len(path) > 0 { sender_mrca = path[0] // shadow } - if sender_mrca != nil { + // yes, sender_mrca may be nil, indicating that we do not have an mrca + { var wg sync.WaitGroup doHint := func(ep Endpoint, name string) { defer wg.Done() @@ -382,8 +383,6 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) { go doHint(fs.sender, "sender") go doHint(fs.receiver, "receiver") wg.Wait() - } else { - log.Debug("cannot identify most recent common ancestor, skipping hint") } var steps []*Step diff --git a/util/errorarray/errorarray.go b/util/errorarray/errorarray.go new file mode 100644 index 0000000..d46f132 --- /dev/null +++ b/util/errorarray/errorarray.go @@ -0,0 +1,43 @@ +package errorarray + +import ( + "fmt" + "strings" +) + +type Errors struct { + Msg string + Wrapped []error +} + +var _ error = (*Errors)(nil) + +func Wrap(errs []error, msg string) Errors { + if len(errs) == 0 { + panic("passing empty errs argument") + } + return Errors{Msg: msg, Wrapped: errs} +} + +func (e Errors) Unwrap() error { + if len(e.Wrapped) == 1 { + return e.Wrapped[0] + } + return nil // ... limitation of the Go 1.13 errors API +} + +func (e Errors) Error() string { + if len(e.Wrapped) == 1 { + return fmt.Sprintf("%s: %s", e.Msg, e.Wrapped[0]) + } + var buf strings.Builder + fmt.Fprintf(&buf, "%s: multiple errors:\n", e.Msg) + for i, err := range e.Wrapped { + fmt.Fprintf(&buf, "%s", err) + if i != len(e.Wrapped)-1 { + fmt.Fprintf(&buf, "\n") + } + } + return buf.String() + +} diff --git a/zfs/holds.go b/zfs/holds.go index 89759a5..a85c022 100644 --- a/zfs/holds.go +++ b/zfs/holds.go @@ -7,8 +7,6 @@ import ( "fmt" "os" "os/exec" - "sort" - "strconv" "strings" "syscall" @@ -36,12 +34,9 @@ func ValidHoldTag(tag string) error { } // Idemptotent: does not return an error if the tag already exists -func ZFSHold(ctx context.Context, fs string, v ZFSSendArgVersion, tag string) error { - if err := v.ValidateInMemory(fs); err != nil { - return errors.Wrap(err, "invalid version") - } +func ZFSHold(ctx context.Context, fs string, v FilesystemVersion, tag string) error { if !v.IsSnapshot() { - return errors.Errorf("can only hold snapshots, got %s", v.RelName) + return errors.Errorf("can only hold snapshots, got %s", v.RelName()) } if err := validateNotEmpty("tag", tag); err != nil { @@ -131,177 +126,7 @@ func ZFSRelease(ctx context.Context, tag string, snaps ...string) error { debug("zfs release: no such tag lines=%v otherLines=%v", noSuchTagLines, otherLines) } if len(otherLines) > 0 { - return fmt.Errorf("unknown zfs error while releasing hold with tag %q: unidentified stderr lines\n%s", tag, strings.Join(otherLines, "\n")) + return fmt.Errorf("unknown zfs error while releasing hold with tag %q:\n%s", tag, strings.Join(otherLines, "\n")) } return nil } - -// Idempotent: if the hold doesn't exist, this is not an error -func ZFSReleaseAllOlderAndIncludingGUID(ctx context.Context, fs string, snapOrBookmarkGuid uint64, tag string) error { - return doZFSReleaseAllOlderAndIncOrExcludingGUID(ctx, fs, snapOrBookmarkGuid, tag, true) -} - -// Idempotent: if the hold doesn't exist, this is not an error -func ZFSReleaseAllOlderThanGUID(ctx context.Context, fs string, snapOrBookmarkGuid uint64, tag string) error { - return doZFSReleaseAllOlderAndIncOrExcludingGUID(ctx, fs, snapOrBookmarkGuid, tag, false) -} - -type zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine struct { - entityType EntityType - name string - createtxg uint64 - guid uint64 - userrefs uint64 // always 0 for bookmarks -} - -func doZFSReleaseAllOlderAndIncOrExcludingGUID(ctx context.Context, fs string, snapOrBookmarkGuid uint64, tag string, includeGuid bool) error { - // TODO channel program support still unreleased but - // might be a huge performance improvement - // https://github.com/zfsonlinux/zfs/pull/7902/files - - if err := validateZFSFilesystem(fs); err != nil { - return errors.Wrap(err, "`fs` is not a valid filesystem path") - } - if tag == "" { - return fmt.Errorf("`tag` must not be empty`") - } - - output, err := exec.CommandContext(ctx, - "zfs", "list", "-o", "type,name,createtxg,guid,userrefs", - "-H", "-t", "snapshot,bookmark", "-r", "-d", "1", fs).CombinedOutput() - if err != nil { - return &ZFSError{output, errors.Wrap(err, "cannot list snapshots and their userrefs")} - } - - lines, err := doZFSReleaseAllOlderAndIncOrExcludingGUIDParseListOutput(output) - if err != nil { - return errors.Wrap(err, "unexpected ZFS output") - } - - releaseSnaps, err := doZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(snapOrBookmarkGuid, includeGuid, lines) - if err != nil { - return err - } - - if len(releaseSnaps) == 0 { - return nil - } - return ZFSRelease(ctx, tag, releaseSnaps...) -} - -func doZFSReleaseAllOlderAndIncOrExcludingGUIDParseListOutput(output []byte) ([]zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine, error) { - - scan := bufio.NewScanner(bytes.NewReader(output)) - - var lines []zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine - - for scan.Scan() { - const numCols = 5 - comps := strings.SplitN(scan.Text(), "\t", numCols) - if len(comps) != numCols { - return nil, fmt.Errorf("not %d columns\n%s", numCols, output) - } - dstype := comps[0] - name := comps[1] - - var entityType EntityType - switch dstype { - case "snapshot": - entityType = EntityTypeSnapshot - case "bookmark": - entityType = EntityTypeBookmark - default: - return nil, fmt.Errorf("column 0 is %q, expecting \"snapshot\" or \"bookmark\"", dstype) - } - - createtxg, err := strconv.ParseUint(comps[2], 10, 64) - if err != nil { - return nil, fmt.Errorf("cannot parse createtxg %q: %s\n%s", comps[2], err, output) - } - - guid, err := strconv.ParseUint(comps[3], 10, 64) - if err != nil { - return nil, fmt.Errorf("cannot parse guid %q: %s\n%s", comps[3], err, output) - } - - var userrefs uint64 - switch entityType { - case EntityTypeBookmark: - if comps[4] != "-" { - return nil, fmt.Errorf("entity type \"bookmark\" should have userrefs=\"-\", got %q", comps[4]) - } - userrefs = 0 - case EntityTypeSnapshot: - userrefs, err = strconv.ParseUint(comps[4], 10, 64) // shadow - if err != nil { - return nil, fmt.Errorf("cannot parse userrefs %q: %s\n%s", comps[4], err, output) - } - default: - panic(entityType) - } - - lines = append(lines, zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine{ - entityType: entityType, - name: name, - createtxg: createtxg, - guid: guid, - userrefs: userrefs, - }) - } - - return lines, nil - -} - -func doZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(snapOrBookmarkGuid uint64, includeGuid bool, lines []zfsReleaseAllOlderAndIncOrExcludingGUIDZFSListLine) (releaseSnaps []string, err error) { - - // sort lines by createtxg,(snap < bookmark) - // we cannot do this using zfs list -s because `type` is not a - sort.Slice(lines, func(i, j int) (less bool) { - if lines[i].createtxg == lines[j].createtxg { - iET := func(t EntityType) int { - switch t { - case EntityTypeSnapshot: - return 0 - case EntityTypeBookmark: - return 1 - default: - panic("unexpected entity type " + t.String()) - } - } - return iET(lines[i].entityType) < iET(lines[j].entityType) - } - return lines[i].createtxg < lines[j].createtxg - }) - - // iterate over snapshots oldest to newest and collect snapshots that have holds and - // are older than (inclusive or exclusive, depends on includeGuid) a snapshot or bookmark - // with snapOrBookmarkGuid - foundGuid := false - for _, line := range lines { - if line.guid == snapOrBookmarkGuid { - foundGuid = true - } - if line.userrefs > 0 { - if !foundGuid || (foundGuid && includeGuid) { - // only snapshots have userrefs > 0, no need to check entityType - releaseSnaps = append(releaseSnaps, line.name) - } - } - if foundGuid { - // The secondary key in sorting (snap < bookmark) guarantees that we - // A) either found the snapshot with snapOrBookmarkGuid - // B) or no snapshot with snapGuid exists, but one or more bookmarks of it exists - // In the case of A, we already added the snapshot to releaseSnaps if includeGuid requests it, - // and can ignore possible subsequent bookmarks of the snapshot. - // In the case of B, there is nothing to add to releaseSnaps. - break - } - } - - if !foundGuid { - return nil, fmt.Errorf("cannot find snapshot or bookmark with guid %v", snapOrBookmarkGuid) - } - - return releaseSnaps, nil -} diff --git a/zfs/holds_test.go b/zfs/holds_test.go deleted file mode 100644 index 6443d20..0000000 --- a/zfs/holds_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package zfs - -import ( - "testing" - - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestDoZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(t *testing.T) { - - // what we test here: sort bookmark #3 before @3 - // => assert that the function doesn't stop at the first guid match - // (which might be a bookmark, depending on zfs list ordering) - // but instead considers the entire stride of bookmarks and snapshots with that guid - // - // also, throw in unordered createtxg for good measure - list, err := doZFSReleaseAllOlderAndIncOrExcludingGUIDParseListOutput( - []byte("snapshot\tfoo@1\t1\t1013001\t1\n" + - "snapshot\tfoo@2\t2\t2013002\t1\n" + - "bookmark\tfoo#3\t3\t7013003\t-\n" + - "snapshot\tfoo@6\t6\t5013006\t1\n" + - "snapshot\tfoo@3\t3\t7013003\t1\n" + - "snapshot\tfoo@4\t3\t6013004\t1\n" + - ""), - ) - require.NoError(t, err) - t.Log(pretty.Sprint(list)) - require.Equal(t, 6, len(list)) - require.Equal(t, EntityTypeBookmark, list[2].entityType) - - releaseSnaps, err := doZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(7013003, true, list) - t.Logf("releasedSnaps = %#v", releaseSnaps) - assert.NoError(t, err) - - assert.Equal(t, []string{"foo@1", "foo@2", "foo@3"}, releaseSnaps) -} diff --git a/zfs/versions.go b/zfs/versions.go index 1c12108..8688890 100644 --- a/zfs/versions.go +++ b/zfs/versions.go @@ -55,6 +55,7 @@ func DecomposeVersionString(v string) (fs string, versionType VersionType, name } } +// The data in a FilesystemVersion is guaranteed to stem from a ZFS CLI invocation. type FilesystemVersion struct { Type VersionType @@ -72,9 +73,16 @@ type FilesystemVersion struct { Creation time.Time } -func (v FilesystemVersion) String() string { +func (v FilesystemVersion) GetCreateTXG() uint64 { return v.CreateTXG } +func (v FilesystemVersion) GetGUID() uint64 { return v.Guid } +func (v FilesystemVersion) GetGuid() uint64 { return v.Guid } +func (v FilesystemVersion) GetName() string { return v.Name } +func (v FilesystemVersion) IsSnapshot() bool { return v.Type == Snapshot } +func (v FilesystemVersion) IsBookmark() bool { return v.Type == Bookmark } +func (v FilesystemVersion) RelName() string { return fmt.Sprintf("%s%s", v.Type.DelimiterChar(), v.Name) } +func (v FilesystemVersion) String() string { return v.RelName() } func (v FilesystemVersion) ToAbsPath(p *DatasetPath) string { var b bytes.Buffer @@ -84,6 +92,49 @@ func (v FilesystemVersion) ToAbsPath(p *DatasetPath) string { return b.String() } +func (v FilesystemVersion) FullPath(fs string) string { + return fmt.Sprintf("%s%s", fs, v.RelName()) +} + +func (v FilesystemVersion) ToSendArgVersion() ZFSSendArgVersion { + return ZFSSendArgVersion{ + RelName: v.RelName(), + GUID: v.Guid, + } +} + +type ParseFilesystemVersionArgs struct { + fullname string + guid, createtxg, creation string +} + +func ParseFilesystemVersion(args ParseFilesystemVersionArgs) (v FilesystemVersion, err error) { + _, v.Type, v.Name, err = DecomposeVersionString(args.fullname) + if err != nil { + return v, err + } + + if v.Guid, err = strconv.ParseUint(args.guid, 10, 64); err != nil { + err = errors.Wrapf(err, "cannot parse GUID %q", args.guid) + return v, err + } + + if v.CreateTXG, err = strconv.ParseUint(args.createtxg, 10, 64); err != nil { + err = errors.Wrapf(err, "cannot parse CreateTXG %q", args.createtxg) + return v, err + } + + creationUnix, err := strconv.ParseInt(args.creation, 10, 64) + if err != nil { + err = errors.Wrapf(err, "cannot parse creation date %q", args.creation) + return v, err + } else { + v.Creation = time.Unix(creationUnix, 0) + } + + return v, nil +} + type FilesystemVersionFilter interface { Filter(t VersionType, name string) (accept bool, err error) } @@ -126,32 +177,17 @@ func ZFSListFilesystemVersions(fs *DatasetPath, filter FilesystemVersionFilter) } line := listResult.Fields - - var v FilesystemVersion - - _, v.Type, v.Name, err = DecomposeVersionString(line[0]) + args := ParseFilesystemVersionArgs{ + fullname: line[0], + guid: line[1], + createtxg: line[2], + creation: line[3], + } + v, err := ParseFilesystemVersion(args) if err != nil { return nil, err } - if v.Guid, err = strconv.ParseUint(line[1], 10, 64); err != nil { - err = errors.Wrap(err, "cannot parse GUID") - return - } - - if v.CreateTXG, err = strconv.ParseUint(line[2], 10, 64); err != nil { - err = errors.Wrap(err, "cannot parse CreateTXG") - return - } - - creationUnix, err := strconv.ParseInt(line[3], 10, 64) - if err != nil { - err = fmt.Errorf("cannot parse creation date '%s': %s", line[3], err) - return nil, err - } else { - v.Creation = time.Unix(creationUnix, 0) - } - accept := true if filter != nil { accept, err = filter.Filter(v.Type, v.Name) @@ -167,3 +203,16 @@ func ZFSListFilesystemVersions(fs *DatasetPath, filter FilesystemVersionFilter) } return } + +func ZFSGetFilesystemVersion(ds string) (v FilesystemVersion, _ error) { + props, err := zfsGet(ds, []string{"createtxg", "guid", "creation"}, sourceAny) + if err != nil { + return v, err + } + return ParseFilesystemVersion(ParseFilesystemVersionArgs{ + fullname: ds, + createtxg: props.Get("createtxg"), + guid: props.Get("guid"), + creation: props.Get("creation"), + }) +} diff --git a/zfs/zfs.go b/zfs/zfs.go index 4764098..9234e8f 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -314,7 +314,7 @@ func absVersion(fs string, v *ZFSSendArgVersion) (full string, err error) { // a must already be validated // // SECURITY SENSITIVE because Raw must be handled correctly -func (a ZFSSendArgs) buildCommonSendArgs() ([]string, error) { +func (a ZFSSendArgsUnvalidated) buildCommonSendArgs() ([]string, error) { args := make([]string, 0, 3) // ResumeToken takes precedence, we assume that it has been validated to reflect @@ -525,6 +525,9 @@ type ZFSSendArgVersion struct { GUID uint64 } +func (v ZFSSendArgVersion) GetGuid() uint64 { return v.GUID } +func (v ZFSSendArgVersion) ToSendArgVersion() ZFSSendArgVersion { return v } + func (v ZFSSendArgVersion) ValidateInMemory(fs string) error { if fs == "" { panic(fs) @@ -558,26 +561,26 @@ func (v ZFSSendArgVersion) mustValidateInMemory(fs string) { } // fs must be not empty -func (a ZFSSendArgVersion) ValidateExistsAndGetCheckedProps(ctx context.Context, fs string) (ZFSPropCreateTxgAndGuidProps, error) { +func (a ZFSSendArgVersion) ValidateExistsAndGetVersion(ctx context.Context, fs string) (v FilesystemVersion, _ error) { if err := a.ValidateInMemory(fs); err != nil { - return ZFSPropCreateTxgAndGuidProps{}, nil + return v, nil } - realProps, err := ZFSGetCreateTXGAndGuid(a.FullPath(fs)) + realVersion, err := ZFSGetFilesystemVersion(a.FullPath(fs)) if err != nil { - return ZFSPropCreateTxgAndGuidProps{}, err + return v, err } - if realProps.Guid != a.GUID { - return ZFSPropCreateTxgAndGuidProps{}, fmt.Errorf("`GUID` field does not match real dataset's GUID: %q != %q", realProps.Guid, a.GUID) + if realVersion.Guid != a.GUID { + return v, fmt.Errorf("`GUID` field does not match real dataset's GUID: %q != %q", realVersion.Guid, a.GUID) } - return realProps, nil + return realVersion, nil } func (a ZFSSendArgVersion) ValidateExists(ctx context.Context, fs string) error { - _, err := a.ValidateExistsAndGetCheckedProps(ctx, fs) + _, err := a.ValidateExistsAndGetVersion(ctx, fs) return err } @@ -619,7 +622,7 @@ func (n *NilBool) String() string { } // When updating this struct, check Validate and ValidateCorrespondsToResumeToken (POTENTIALLY SECURITY SENSITIVE) -type ZFSSendArgs struct { +type ZFSSendArgsUnvalidated struct { FS string From, To *ZFSSendArgVersion // From may be nil Encrypted *NilBool @@ -628,6 +631,12 @@ type ZFSSendArgs struct { ResumeToken string // if not nil, must match what is specified in From, To (covered by ValidateCorrespondsToResumeToken) } +type ZFSSendArgsValidated struct { + ZFSSendArgsUnvalidated + FromVersion *FilesystemVersion + ToVersion FilesystemVersion +} + type zfsSendArgsValidationContext struct { encEnabled *NilBool } @@ -642,16 +651,16 @@ const ( ) type ZFSSendArgsValidationError struct { - Args ZFSSendArgs + Args ZFSSendArgsUnvalidated What ZFSSendArgsValidationErrorCode Msg error } -func newValidationError(sendArgs ZFSSendArgs, what ZFSSendArgsValidationErrorCode, cause error) *ZFSSendArgsValidationError { +func newValidationError(sendArgs ZFSSendArgsUnvalidated, what ZFSSendArgsValidationErrorCode, cause error) *ZFSSendArgsValidationError { return &ZFSSendArgsValidationError{sendArgs, what, cause} } -func newGenericValidationError(sendArgs ZFSSendArgs, cause error) *ZFSSendArgsValidationError { +func newGenericValidationError(sendArgs ZFSSendArgsUnvalidated, cause error) *ZFSSendArgsValidationError { return &ZFSSendArgsValidationError{sendArgs, ZFSSendArgsGenericValidationError, cause} } @@ -663,49 +672,57 @@ func (e ZFSSendArgsValidationError) Error() string { // - Make sure that if ResumeToken != "", it reflects the same operation as the other parameters would. // // This function is not pure because GUIDs are checked against the local host's datasets. -func (a ZFSSendArgs) Validate(ctx context.Context) error { +func (a ZFSSendArgsUnvalidated) Validate(ctx context.Context) (v ZFSSendArgsValidated, _ error) { if dp, err := NewDatasetPath(a.FS); err != nil || dp.Length() == 0 { - return newGenericValidationError(a, fmt.Errorf("`FS` must be a valid non-zero dataset path")) + return v, newGenericValidationError(a, fmt.Errorf("`FS` must be a valid non-zero dataset path")) } if a.To == nil { - return newGenericValidationError(a, fmt.Errorf("`To` must not be nil")) + return v, newGenericValidationError(a, fmt.Errorf("`To` must not be nil")) } - if err := a.To.ValidateExists(ctx, a.FS); err != nil { - return newGenericValidationError(a, errors.Wrap(err, "`To` invalid")) + toVersion, err := a.To.ValidateExistsAndGetVersion(ctx, a.FS) + if err != nil { + return v, newGenericValidationError(a, errors.Wrap(err, "`To` invalid")) } + var fromVersion *FilesystemVersion if a.From != nil { - if err := a.From.ValidateExists(ctx, a.FS); err != nil { - return newGenericValidationError(a, errors.Wrap(err, "`From` invalid")) + fromV, err := a.From.ValidateExistsAndGetVersion(ctx, a.FS) + if err != nil { + return v, newGenericValidationError(a, errors.Wrap(err, "`From` invalid")) } + fromVersion = &fromV // fallthrough } if err := a.Encrypted.Validate(); err != nil { - return newGenericValidationError(a, errors.Wrap(err, "`Raw` invalid")) + return v, newGenericValidationError(a, errors.Wrap(err, "`Raw` invalid")) } valCtx := &zfsSendArgsValidationContext{} fsEncrypted, err := ZFSGetEncryptionEnabled(ctx, a.FS) if err != nil { - return newValidationError(a, ZFSSendArgsFSEncryptionCheckFail, + return v, newValidationError(a, ZFSSendArgsFSEncryptionCheckFail, errors.Wrapf(err, "cannot check whether filesystem %q is encrypted", a.FS)) } valCtx.encEnabled = &NilBool{fsEncrypted} if a.Encrypted.B && !fsEncrypted { - return newValidationError(a, ZFSSendArgsEncryptedSendRequestedButFSUnencrypted, + return v, newValidationError(a, ZFSSendArgsEncryptedSendRequestedButFSUnencrypted, errors.Errorf("encrypted send requested, but filesystem %q is not encrypted", a.FS)) } if a.ResumeToken != "" { if err := a.validateCorrespondsToResumeToken(ctx, valCtx); err != nil { - return newValidationError(a, ZFSSendArgsResumeTokenMismatch, err) + return v, newValidationError(a, ZFSSendArgsResumeTokenMismatch, err) } } - return nil + return ZFSSendArgsValidated{ + ZFSSendArgsUnvalidated: a, + FromVersion: fromVersion, + ToVersion: toVersion, + }, nil } type ZFSSendArgsResumeTokenMismatchError struct { @@ -735,7 +752,7 @@ func (c ZFSSendArgsResumeTokenMismatchErrorCode) fmt(format string, args ...inte // This is SECURITY SENSITIVE and requires exhaustive checking of both side's values // An attacker requesting a Send with a crafted ResumeToken may encode different parameters in the resume token than expected: // for example, they may specify another file system (e.g. the filesystem with secret data) or request unencrypted send instead of encrypted raw send. -func (a ZFSSendArgs) validateCorrespondsToResumeToken(ctx context.Context, valCtx *zfsSendArgsValidationContext) error { +func (a ZFSSendArgsUnvalidated) validateCorrespondsToResumeToken(ctx context.Context, valCtx *zfsSendArgsValidationContext) error { if a.ResumeToken == "" { return nil // nothing to do @@ -808,7 +825,7 @@ var ErrEncryptedSendNotSupported = fmt.Errorf("raw sends which are required for // (if from is "" a full ZFS send is done) // // Returns ErrEncryptedSendNotSupported if encrypted send is requested but not supported by CLI -func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, error) { +func ZFSSend(ctx context.Context, sendArgs ZFSSendArgsValidated) (*ReadCloserCopier, error) { args := make([]string, 0) args = append(args, "send") @@ -825,10 +842,6 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgs) (*ReadCloserCopier, erro } } - if err := sendArgs.Validate(ctx); err != nil { - return nil, err // do not wrap, part of API, tested by platformtest - } - sargs, err := sendArgs.buildCommonSendArgs() if err != nil { return nil, err @@ -959,11 +972,7 @@ func (s *DrySendInfo) unmarshalInfoLine(l string) (regexMatched bool, err error) // to may be "", in which case a full ZFS send is done // May return BookmarkSizeEstimationNotSupported as err if from is a bookmark. -func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgs) (_ *DrySendInfo, err error) { - - if err := sendArgs.Validate(ctx); err != nil { - return nil, errors.Wrap(err, "cannot validate send args") - } +func ZFSSendDry(ctx context.Context, sendArgs ZFSSendArgsValidated) (_ *DrySendInfo, err error) { if sendArgs.From != nil && strings.Contains(sendArgs.From.RelName, "#") { /* TODO: @@ -1458,41 +1467,6 @@ func zfsGet(path string, props []string, allowedSources zfsPropertySource) (*ZFS return res, nil } -type ZFSPropCreateTxgAndGuidProps struct { - CreateTXG, Guid uint64 -} - -func ZFSGetCreateTXGAndGuid(ds string) (ZFSPropCreateTxgAndGuidProps, error) { - props, err := zfsGetNumberProps(ds, []string{"createtxg", "guid"}, sourceAny) - if err != nil { - return ZFSPropCreateTxgAndGuidProps{}, err - } - return ZFSPropCreateTxgAndGuidProps{ - CreateTXG: props["createtxg"], - Guid: props["guid"], - }, nil -} - -// returns *DatasetDoesNotExist if the dataset does not exist -func zfsGetNumberProps(ds string, props []string, src zfsPropertySource) (map[string]uint64, error) { - sps, err := zfsGet(ds, props, sourceAny) - if err != nil { - if _, ok := err.(*DatasetDoesNotExist); ok { - return nil, err // pass through as is - } - return nil, errors.Wrap(err, "zfs: set replication cursor: get snapshot createtxg") - } - r := make(map[string]uint64, len(props)) - for _, p := range props { - v, err := strconv.ParseUint(sps.Get(p), 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "zfs get: parse number property %q", p) - } - r[p] = v - } - return r, nil -} - type DestroySnapshotsError struct { RawLines []string Filesystem string