mirror of
https://github.com/zrepl/zrepl.git
synced 2025-02-16 18:30:54 +01:00
range-based createtxg queries
This commit is contained in:
parent
86698c2789
commit
e82aea5862
@ -42,7 +42,6 @@ func (f holdsFilterFlags) Query() (endpoint.ListZFSHoldsAndBookmarksQuery, error
|
||||
FS: f.Filesystems.FlagValue(),
|
||||
What: f.Types.FlagValue(),
|
||||
JobID: f.Job.FlagValue(),
|
||||
Until: nil, // TODO support this as a flag
|
||||
Concurrency: f.Concurrency,
|
||||
}
|
||||
return q, q.Validate()
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -205,19 +206,31 @@ type ListZFSHoldsAndBookmarksQuery struct {
|
||||
// if not nil: JobID of the hold or bookmark in question must be equal
|
||||
// else: JobID of the hold or bookmark can be any value
|
||||
JobID *JobID
|
||||
// if not nil: The hold's snapshot or the bookmark's createtxg must be less than (or equal) Until
|
||||
// else: CreateTXG of the hold or bookmark can be any value
|
||||
Until *InclusiveExclusiveCreateTXG
|
||||
|
||||
// zero-value means any CreateTXG is acceptable
|
||||
CreateTXG CreateTXGRange
|
||||
|
||||
// Number of concurrently queried filesystems. Must be >= 1
|
||||
Concurrency int64
|
||||
}
|
||||
|
||||
type InclusiveExclusiveCreateTXG struct {
|
||||
type CreateTXGRangeBound struct {
|
||||
CreateTXG uint64
|
||||
Inclusive *zfs.NilBool // must not be nil
|
||||
}
|
||||
|
||||
// A non-empty range of CreateTXGs
|
||||
//
|
||||
// If both Since and Until are nil, any CreateTXG is acceptable
|
||||
type CreateTXGRange struct {
|
||||
// if not nil: The hold's snapshot or the bookmark's createtxg must be greater than (or equal) Since
|
||||
// else: CreateTXG of the hold or bookmark can be any value accepted by Until
|
||||
Since *CreateTXGRangeBound
|
||||
// if not nil: The hold's snapshot or the bookmark's createtxg must be less than (or equal) Until
|
||||
// else: CreateTXG of the hold or bookmark can be any value accepted by Since
|
||||
Until *CreateTXGRangeBound
|
||||
}
|
||||
|
||||
// FS == nil XOR Filter == nil
|
||||
type ListZFSHoldsAndBookmarksQueryFilesystemFilter struct {
|
||||
FS *string
|
||||
@ -231,10 +244,8 @@ func (q *ListZFSHoldsAndBookmarksQuery) Validate() error {
|
||||
if q.JobID != nil {
|
||||
q.JobID.MustValidate() // FIXME
|
||||
}
|
||||
if q.Until != nil {
|
||||
if err := q.Until.Validate(); err != nil {
|
||||
return errors.Wrap(err, "Until")
|
||||
}
|
||||
if err := q.CreateTXG.Validate(); err != nil {
|
||||
return errors.Wrap(err, "CreateTXGRange")
|
||||
}
|
||||
if err := q.What.Validate(); err != nil {
|
||||
return err
|
||||
@ -245,13 +256,13 @@ func (q *ListZFSHoldsAndBookmarksQuery) Validate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var zreplEndpointListAbstractionsQueryCreatetxg0Allowed = envconst.Bool("ZREPL_ENDPOINT_LIST_ABSTRACTIONS_QUERY_CREATETXG_0_ALLOWED", false)
|
||||
var createTXGRangeBoundAllowCreateTXG0 = envconst.Bool("ZREPL_ENDPOINT_LIST_ABSTRACTIONS_QUERY_CREATETXG_RANGE_BOUND_ALLOW_0", false)
|
||||
|
||||
func (i *InclusiveExclusiveCreateTXG) Validate() error {
|
||||
func (i *CreateTXGRangeBound) Validate() error {
|
||||
if err := i.Inclusive.Validate(); err != nil {
|
||||
return errors.Wrap(err, "Inclusive")
|
||||
}
|
||||
if i.CreateTXG == 0 && !zreplEndpointListAbstractionsQueryCreatetxg0Allowed {
|
||||
if i.CreateTXG == 0 && !createTXGRangeBoundAllowCreateTXG0 {
|
||||
return errors.New("CreateTXG must be non-zero")
|
||||
}
|
||||
return nil
|
||||
@ -296,6 +307,136 @@ func (f *ListZFSHoldsAndBookmarksQueryFilesystemFilter) Filesystems(ctx context.
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
func (r *CreateTXGRange) Validate() error {
|
||||
if r.Since != nil {
|
||||
if err := r.Since.Validate(); err != nil {
|
||||
return errors.Wrap(err, "Since")
|
||||
}
|
||||
}
|
||||
if r.Until != nil {
|
||||
if err := r.Until.Validate(); err != nil {
|
||||
return errors.Wrap(err, "Until")
|
||||
}
|
||||
}
|
||||
if _, err := r.effectiveBounds(); err != nil {
|
||||
return errors.Wrapf(err, "specified range %s is semantically invalid", r)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// inclusive-inclusive bounds
|
||||
type effectiveBounds struct {
|
||||
sinceInclusive uint64
|
||||
sinceUnbounded bool
|
||||
untilInclusive uint64
|
||||
untilUnbounded bool
|
||||
}
|
||||
|
||||
// callers must have validated r.Since and r.Until before calling this method
|
||||
func (r *CreateTXGRange) effectiveBounds() (bounds effectiveBounds, err error) {
|
||||
|
||||
bounds.sinceUnbounded = r.Since == nil
|
||||
bounds.untilUnbounded = r.Until == nil
|
||||
|
||||
if r.Since == nil && r.Until == nil {
|
||||
return bounds, nil
|
||||
}
|
||||
|
||||
if r.Since != nil {
|
||||
bounds.sinceInclusive = r.Since.CreateTXG
|
||||
if !r.Since.Inclusive.B {
|
||||
if r.Since.CreateTXG == math.MaxUint64 {
|
||||
return bounds, errors.Errorf("Since-exclusive (%v) must be less than math.MaxUint64 (%v)",
|
||||
r.Since.CreateTXG, uint64(math.MaxUint64))
|
||||
}
|
||||
bounds.sinceInclusive++
|
||||
}
|
||||
}
|
||||
|
||||
if r.Until != nil {
|
||||
bounds.untilInclusive = r.Until.CreateTXG
|
||||
if !r.Until.Inclusive.B {
|
||||
if r.Until.CreateTXG == 0 {
|
||||
return bounds, errors.Errorf("Until-exclusive (%v) must be greater than 0", r.Until.CreateTXG)
|
||||
}
|
||||
bounds.untilInclusive--
|
||||
}
|
||||
}
|
||||
|
||||
if !bounds.sinceUnbounded && !bounds.untilUnbounded {
|
||||
if bounds.sinceInclusive >= bounds.untilInclusive {
|
||||
return bounds, errors.Errorf("effective range bounds are [%v,%v] which is empty or invalid", bounds.sinceInclusive, bounds.untilInclusive)
|
||||
} else {
|
||||
// OK, not empty, fallthrough
|
||||
}
|
||||
// fallthrough
|
||||
}
|
||||
|
||||
return bounds, nil
|
||||
}
|
||||
|
||||
func (r *CreateTXGRange) String() string {
|
||||
var buf strings.Builder
|
||||
if r.Since == nil {
|
||||
fmt.Fprintf(&buf, "~")
|
||||
} else {
|
||||
if err := r.Since.Inclusive.Validate(); err != nil {
|
||||
fmt.Fprintf(&buf, "?")
|
||||
} else if r.Since.Inclusive.B {
|
||||
fmt.Fprintf(&buf, "[")
|
||||
} else {
|
||||
fmt.Fprintf(&buf, "(")
|
||||
}
|
||||
fmt.Fprintf(&buf, "%d", r.Since.CreateTXG)
|
||||
}
|
||||
|
||||
fmt.Fprintf(&buf, ",")
|
||||
|
||||
if r.Until == nil {
|
||||
fmt.Fprintf(&buf, "~")
|
||||
} else {
|
||||
fmt.Fprintf(&buf, "%d", r.Until.CreateTXG)
|
||||
if err := r.Until.Inclusive.Validate(); err != nil {
|
||||
fmt.Fprintf(&buf, "?")
|
||||
} else if r.Until.Inclusive.B {
|
||||
fmt.Fprintf(&buf, "]")
|
||||
} else {
|
||||
fmt.Fprintf(&buf, ")")
|
||||
}
|
||||
}
|
||||
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// panics if not .Validate()
|
||||
func (r *CreateTXGRange) IsUnbounded() bool {
|
||||
if err := r.Validate(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
bounds, err := r.effectiveBounds()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return bounds.sinceUnbounded && bounds.untilUnbounded
|
||||
}
|
||||
|
||||
// panics if not .Validate()
|
||||
func (r *CreateTXGRange) Contains(qCreateTxg uint64) bool {
|
||||
if err := r.Validate(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
bounds, err := r.effectiveBounds()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
sinceMatches := bounds.sinceUnbounded || bounds.sinceInclusive <= qCreateTxg
|
||||
untilMatches := bounds.untilUnbounded || qCreateTxg <= bounds.untilInclusive
|
||||
|
||||
return sinceMatches && untilMatches
|
||||
}
|
||||
|
||||
type ListAbstractionsError struct {
|
||||
FS string
|
||||
Snap string
|
||||
@ -383,16 +524,9 @@ func ListAbstractionsStreamed(ctx context.Context, query ListZFSHoldsAndBookmark
|
||||
emitAbstraction := func(a Abstraction) {
|
||||
jobIdMatches := query.JobID == nil || a.GetJobID() == nil || *a.GetJobID() == *query.JobID
|
||||
|
||||
untilMatches := query.Until == nil
|
||||
if query.Until != nil {
|
||||
if query.Until.Inclusive.B {
|
||||
untilMatches = a.GetCreateTXG() <= query.Until.CreateTXG
|
||||
} else {
|
||||
untilMatches = a.GetCreateTXG() < query.Until.CreateTXG
|
||||
}
|
||||
}
|
||||
createTXGMatches := query.CreateTXG.Contains(a.GetCreateTXG())
|
||||
|
||||
if jobIdMatches && untilMatches {
|
||||
if jobIdMatches && createTXGMatches {
|
||||
out <- a
|
||||
}
|
||||
}
|
||||
@ -451,7 +585,7 @@ func listAbstractionsImplFS(ctx context.Context, fs string, query *ListZFSHoldsA
|
||||
if v.Type == zfs.Bookmark && bmE != nil {
|
||||
a = bmE(fsp, v)
|
||||
}
|
||||
if v.Type == zfs.Snapshot && holdE != nil {
|
||||
if v.Type == zfs.Snapshot && holdE != nil && query.CreateTXG.Contains(v.GetCreateTXG()) {
|
||||
holds, err := zfs.ZFSHolds(ctx, fsp.ToString(), v.Name)
|
||||
if err != nil {
|
||||
errCb(err, v.ToAbsPath(fsp), "get hold on snap")
|
||||
@ -516,7 +650,7 @@ type StalenessInfo struct {
|
||||
}
|
||||
|
||||
func ListStale(ctx context.Context, q ListZFSHoldsAndBookmarksQuery) (*StalenessInfo, error) {
|
||||
if q.Until != nil {
|
||||
if !q.CreateTXG.IsUnbounded() {
|
||||
// we must determine the most recent step per FS, can't allow that
|
||||
return nil, errors.New("ListStale cannot have Until != nil set on query")
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ func GetReplicationCursors(ctx context.Context, dp *zfs.DatasetPath, jobID JobID
|
||||
AbstractionReplicationCursorBookmarkV2: true,
|
||||
},
|
||||
JobID: &jobID,
|
||||
Until: nil,
|
||||
CreateTXG: CreateTXGRange{},
|
||||
Concurrency: 1,
|
||||
}
|
||||
abs, absErr, err := ListAbstractions(ctx, q)
|
||||
@ -173,9 +173,12 @@ func DestroyObsoleteReplicationCursors(ctx context.Context, fs string, current R
|
||||
AbstractionReplicationCursorBookmarkV2: true,
|
||||
},
|
||||
JobID: &jobID,
|
||||
Until: &InclusiveExclusiveCreateTXG{
|
||||
CreateTXG: current.GetCreateTXG(),
|
||||
Inclusive: &zfs.NilBool{B: false},
|
||||
CreateTXG: CreateTXGRange{
|
||||
Since: nil,
|
||||
Until: &CreateTXGRangeBound{
|
||||
CreateTXG: current.GetCreateTXG(),
|
||||
Inclusive: &zfs.NilBool{B: false},
|
||||
},
|
||||
},
|
||||
Concurrency: 1,
|
||||
}
|
||||
@ -262,9 +265,12 @@ func MoveLastReceivedHold(ctx context.Context, fs string, to zfs.FilesystemVersi
|
||||
FS: &fs,
|
||||
},
|
||||
JobID: &jobID,
|
||||
Until: &InclusiveExclusiveCreateTXG{
|
||||
CreateTXG: to.GetCreateTXG(),
|
||||
Inclusive: &zfs.NilBool{B: false},
|
||||
CreateTXG: CreateTXGRange{
|
||||
Since: nil,
|
||||
Until: &CreateTXGRangeBound{
|
||||
CreateTXG: to.GetCreateTXG(),
|
||||
Inclusive: &zfs.NilBool{B: false},
|
||||
},
|
||||
},
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
@ -150,9 +150,12 @@ func ReleaseStepCummulativeInclusive(ctx context.Context, fs string, mostRecent
|
||||
FS: &fs,
|
||||
},
|
||||
JobID: &jobID,
|
||||
Until: &InclusiveExclusiveCreateTXG{
|
||||
CreateTXG: mostRecent.CreateTXG,
|
||||
Inclusive: &zfs.NilBool{B: true},
|
||||
CreateTXG: CreateTXGRange{
|
||||
Since: nil,
|
||||
Until: &CreateTXGRangeBound{
|
||||
CreateTXG: mostRecent.CreateTXG,
|
||||
Inclusive: &zfs.NilBool{B: true},
|
||||
},
|
||||
},
|
||||
Concurrency: 1,
|
||||
}
|
||||
|
233
endpoint/endpoint_zfs_abstraction_test.go
Normal file
233
endpoint/endpoint_zfs_abstraction_test.go
Normal file
@ -0,0 +1,233 @@
|
||||
package endpoint
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"runtime/debug"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
func TestCreateTXGRange(t *testing.T) {
|
||||
|
||||
type testCaseExpectation struct {
|
||||
input uint64
|
||||
expect bool
|
||||
}
|
||||
type testCase struct {
|
||||
name string
|
||||
config *CreateTXGRange
|
||||
configAllowZeroCreateTXG bool
|
||||
expectInvalid bool
|
||||
expectString string
|
||||
expect []testCaseExpectation
|
||||
}
|
||||
|
||||
tcs := []testCase{
|
||||
{
|
||||
name: "unbounded",
|
||||
expectInvalid: false,
|
||||
config: &CreateTXGRange{
|
||||
Since: nil,
|
||||
Until: nil,
|
||||
},
|
||||
expectString: "~,~",
|
||||
expect: []testCaseExpectation{
|
||||
{0, true},
|
||||
{math.MaxUint64, true},
|
||||
{1, true},
|
||||
{math.MaxUint64 - 1, true},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "wrong order obvious",
|
||||
expectInvalid: true,
|
||||
config: &CreateTXGRange{
|
||||
Since: &CreateTXGRangeBound{23, &zfs.NilBool{B: true}},
|
||||
Until: &CreateTXGRangeBound{20, &zfs.NilBool{B: true}},
|
||||
},
|
||||
expectString: "[23,20]",
|
||||
},
|
||||
{
|
||||
name: "wrong order edge-case could also be empty",
|
||||
expectInvalid: true,
|
||||
config: &CreateTXGRange{
|
||||
Since: &CreateTXGRangeBound{23, &zfs.NilBool{B: false}},
|
||||
Until: &CreateTXGRangeBound{22, &zfs.NilBool{B: true}},
|
||||
},
|
||||
expectString: "(23,22]",
|
||||
},
|
||||
{
|
||||
name: "empty",
|
||||
expectInvalid: true,
|
||||
config: &CreateTXGRange{
|
||||
Since: &CreateTXGRangeBound{2, &zfs.NilBool{B: false}},
|
||||
Until: &CreateTXGRangeBound{2, &zfs.NilBool{B: false}},
|
||||
},
|
||||
expectString: "(2,2)",
|
||||
},
|
||||
{
|
||||
name: "inclusive-since-exclusive-until",
|
||||
expectInvalid: false,
|
||||
config: &CreateTXGRange{
|
||||
Since: &CreateTXGRangeBound{2, &zfs.NilBool{B: true}},
|
||||
Until: &CreateTXGRangeBound{5, &zfs.NilBool{B: false}},
|
||||
},
|
||||
expectString: "[2,5)",
|
||||
expect: []testCaseExpectation{
|
||||
{0, false},
|
||||
{1, false},
|
||||
{2, true},
|
||||
{3, true},
|
||||
{4, true},
|
||||
{5, false},
|
||||
{6, false},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "exclusive-since-inclusive-until",
|
||||
expectInvalid: false,
|
||||
config: &CreateTXGRange{
|
||||
Since: &CreateTXGRangeBound{2, &zfs.NilBool{B: false}},
|
||||
Until: &CreateTXGRangeBound{5, &zfs.NilBool{B: true}},
|
||||
},
|
||||
expectString: "(2,5]",
|
||||
expect: []testCaseExpectation{
|
||||
{0, false},
|
||||
{1, false},
|
||||
{2, false},
|
||||
{3, true},
|
||||
{4, true},
|
||||
{5, true},
|
||||
{6, false},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "zero-createtxg-not-allowed-because-likely-programmer-error",
|
||||
expectInvalid: true,
|
||||
config: &CreateTXGRange{
|
||||
Since: nil,
|
||||
Until: &CreateTXGRangeBound{0, &zfs.NilBool{B: true}},
|
||||
},
|
||||
expectString: "~,0]",
|
||||
},
|
||||
{
|
||||
name: "half-open-no-until",
|
||||
expectInvalid: false,
|
||||
config: &CreateTXGRange{
|
||||
Since: &CreateTXGRangeBound{2, &zfs.NilBool{B: false}},
|
||||
Until: nil,
|
||||
},
|
||||
expectString: "(2,~",
|
||||
expect: []testCaseExpectation{
|
||||
{0, false},
|
||||
{1, false},
|
||||
{2, false},
|
||||
{3, true},
|
||||
{4, true},
|
||||
{5, true},
|
||||
{6, true},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "half-open-no-since",
|
||||
expectInvalid: false,
|
||||
config: &CreateTXGRange{
|
||||
Since: nil,
|
||||
Until: &CreateTXGRangeBound{4, &zfs.NilBool{B: true}},
|
||||
},
|
||||
expectString: "~,4]",
|
||||
expect: []testCaseExpectation{
|
||||
{0, true},
|
||||
{1, true},
|
||||
{2, true},
|
||||
{3, true},
|
||||
{4, true},
|
||||
{5, false},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "edgeSince",
|
||||
expectInvalid: false,
|
||||
config: &CreateTXGRange{
|
||||
Since: &CreateTXGRangeBound{math.MaxUint64, &zfs.NilBool{B: true}},
|
||||
Until: nil,
|
||||
},
|
||||
expectString: "[18446744073709551615,~",
|
||||
expect: []testCaseExpectation{
|
||||
{math.MaxUint64, true},
|
||||
{math.MaxUint64 - 1, false},
|
||||
{0, false},
|
||||
{1, false},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "edgeSinceNegative",
|
||||
expectInvalid: true,
|
||||
config: &CreateTXGRange{
|
||||
Since: &CreateTXGRangeBound{math.MaxUint64, &zfs.NilBool{B: false}},
|
||||
Until: nil,
|
||||
},
|
||||
expectString: "(18446744073709551615,~",
|
||||
},
|
||||
{
|
||||
name: "edgeUntil",
|
||||
expectInvalid: false,
|
||||
config: &CreateTXGRange{
|
||||
Until: &CreateTXGRangeBound{0, &zfs.NilBool{B: true}},
|
||||
},
|
||||
configAllowZeroCreateTXG: true,
|
||||
expectString: "~,0]",
|
||||
expect: []testCaseExpectation{
|
||||
{0, true},
|
||||
{math.MaxUint64, false},
|
||||
{1, false},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "edgeUntilNegative",
|
||||
expectInvalid: true,
|
||||
configAllowZeroCreateTXG: true,
|
||||
config: &CreateTXGRange{
|
||||
Until: &CreateTXGRangeBound{0, &zfs.NilBool{B: false}},
|
||||
},
|
||||
expectString: "~,0)",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
require.True(t, tc.expectInvalid != (len(tc.expect) > 0), "invalid test config: must either expect invalid or have expectations: %s", tc.name)
|
||||
require.NotEmpty(t, tc.expectString)
|
||||
assert.Equal(t, tc.expectString, tc.config.String())
|
||||
|
||||
save := createTXGRangeBoundAllowCreateTXG0
|
||||
createTXGRangeBoundAllowCreateTXG0 = tc.configAllowZeroCreateTXG
|
||||
defer func() {
|
||||
createTXGRangeBoundAllowCreateTXG0 = save
|
||||
}()
|
||||
|
||||
if tc.expectInvalid {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert.Error(t, tc.config.Validate())
|
||||
})
|
||||
} else {
|
||||
for i, e := range tc.expect {
|
||||
t.Run(fmt.Sprint(i), func(t *testing.T) {
|
||||
defer func() {
|
||||
v := recover()
|
||||
if v != nil {
|
||||
t.Fatalf("should not panic: %T %v\n%s", v, v, debug.Stack())
|
||||
}
|
||||
}()
|
||||
assert.Equal(t, e.expect, tc.config.Contains(e.input))
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user