mirror of
https://github.com/zrepl/zrepl.git
synced 2025-02-16 10:29:54 +01:00
endpoint: fix ListStale for step-* abstractions by using replication cursors and falling back to step holds for initial replication
This commit is contained in:
parent
ac2eb9f86b
commit
f5f94219fd
@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
"github.com/zrepl/zrepl/cli"
|
||||
"github.com/zrepl/zrepl/endpoint"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
|
@ -119,7 +119,7 @@ func AbstractionTypeSetFromStrings(sts []string) (AbstractionTypeSet, error) {
|
||||
return ats, nil
|
||||
}
|
||||
|
||||
func (s AbstractionTypeSet) Contains(q AbstractionTypeSet) bool {
|
||||
func (s AbstractionTypeSet) ContainsAll(q AbstractionTypeSet) bool {
|
||||
for k := range q {
|
||||
if _, ok := s[k]; !ok {
|
||||
return false
|
||||
@ -128,6 +128,15 @@ func (s AbstractionTypeSet) Contains(q AbstractionTypeSet) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (s AbstractionTypeSet) ContainsAnyOf(q AbstractionTypeSet) bool {
|
||||
for k := range q {
|
||||
if _, ok := s[k]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s AbstractionTypeSet) String() string {
|
||||
sts := make([]string, 0, len(s))
|
||||
for i := range s {
|
||||
@ -651,12 +660,30 @@ type fsAndJobId struct {
|
||||
jobId JobID
|
||||
}
|
||||
|
||||
type ListStaleQueryError struct {
|
||||
error
|
||||
}
|
||||
|
||||
// returns *ListStaleQueryError if the given query cannot be used for determining staleness info
|
||||
func ListStale(ctx context.Context, q ListZFSHoldsAndBookmarksQuery) (*StalenessInfo, error) {
|
||||
if !q.CreateTXG.IsUnbounded() {
|
||||
// we must determine the most recent step per FS, can't allow that
|
||||
return nil, errors.New("ListStale cannot have Until != nil set on query")
|
||||
return nil, &ListStaleQueryError{errors.New("ListStale cannot have Until != nil set on query")}
|
||||
}
|
||||
|
||||
// if asking for step holds, must also as for step bookmarks (same kind of abstraction)
|
||||
// as well as replication cursor bookmarks (for firstNotStale)
|
||||
ifAnyThenAll := AbstractionTypeSet{
|
||||
AbstractionStepHold: true,
|
||||
AbstractionStepBookmark: true,
|
||||
AbstractionReplicationCursorBookmarkV2: true,
|
||||
}
|
||||
if q.What.ContainsAnyOf(ifAnyThenAll) && !q.What.ContainsAll(ifAnyThenAll) {
|
||||
return nil, &ListStaleQueryError{errors.Errorf("ListStale requires query to ask for all of %s", ifAnyThenAll.String())}
|
||||
}
|
||||
|
||||
// ----------------- done validating query for listStaleFiltering -----------------------
|
||||
|
||||
qAbs, absErr, err := ListAbstractions(ctx, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -666,34 +693,7 @@ func ListStale(ctx context.Context, q ListZFSHoldsAndBookmarksQuery) (*Staleness
|
||||
return nil, ListAbstractionsErrors(absErr)
|
||||
}
|
||||
|
||||
buildMostRecentFrom := qAbs
|
||||
if !q.What[AbstractionReplicationCursorBookmarkV2] {
|
||||
cq := q
|
||||
cq.What = AbstractionTypeSet{AbstractionReplicationCursorBookmarkV2: true}
|
||||
buildMostRecentFrom, absErr, err = ListAbstractions(ctx, cq) // no shadow
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(absErr) > 0 {
|
||||
// can't go on here because we can't determine the most recent step
|
||||
return nil, ListAbstractionsErrors(absErr)
|
||||
}
|
||||
}
|
||||
mostRecentCursorByFS := make(map[fsAndJobId]*Abstraction) // empty map => will always return nil
|
||||
for _, a := range buildMostRecentFrom {
|
||||
if a.GetType() != AbstractionReplicationCursorBookmarkV2 {
|
||||
continue
|
||||
}
|
||||
key := fsAndJobId{a.GetFS(), *a.GetJobID()}
|
||||
mra := mostRecentCursorByFS[key]
|
||||
if mra == nil || (*mra).GetCreateTXG() < a.GetCreateTXG() {
|
||||
a := a
|
||||
mra = &a
|
||||
}
|
||||
mostRecentCursorByFS[key] = mra
|
||||
}
|
||||
|
||||
si := listStaleFiltering(qAbs, mostRecentCursorByFS)
|
||||
si := listStaleFiltering(qAbs, q.CreateTXG.Since)
|
||||
si.ConstructedWithQuery = q
|
||||
return si, nil
|
||||
}
|
||||
@ -710,7 +710,7 @@ type fsAjobAtype struct {
|
||||
// For replication cursors and last-received-holds, only the most recent one is kept.
|
||||
//
|
||||
// the returned StalenessInfo.ConstructedWithQuery is not set
|
||||
func listStaleFiltering(abs []Abstraction, mostRecentCursorByFS map[fsAndJobId]*Abstraction) *StalenessInfo {
|
||||
func listStaleFiltering(abs []Abstraction, sinceBound *CreateTXGRangeBound) *StalenessInfo {
|
||||
|
||||
var noJobId []Abstraction
|
||||
by := make(map[fsAjobAtype][]Abstraction)
|
||||
@ -725,6 +725,41 @@ func listStaleFiltering(abs []Abstraction, mostRecentCursorByFS map[fsAndJobId]*
|
||||
by[faj] = l
|
||||
}
|
||||
|
||||
type stepFirstNotStaleCandidate struct {
|
||||
cursor *Abstraction
|
||||
step *Abstraction
|
||||
}
|
||||
stepFirstNotStaleCandidates := make(map[fsAndJobId]stepFirstNotStaleCandidate) // empty map => will always return nil
|
||||
for _, a := range abs {
|
||||
key := fsAndJobId{a.GetFS(), *a.GetJobID()}
|
||||
c := stepFirstNotStaleCandidates[key]
|
||||
|
||||
switch a.GetType() {
|
||||
// stepFirstNotStaleCandidate.cursor
|
||||
case AbstractionReplicationCursorBookmarkV2:
|
||||
if c.cursor == nil || (*c.cursor).GetCreateTXG() < a.GetCreateTXG() {
|
||||
a := a
|
||||
c.cursor = &a
|
||||
}
|
||||
|
||||
// stepFirstNotStaleCandidate.step
|
||||
case AbstractionStepBookmark:
|
||||
fallthrough
|
||||
case AbstractionStepHold:
|
||||
if c.step == nil || (*c.step).GetCreateTXG() < a.GetCreateTXG() {
|
||||
a := a
|
||||
c.step = &a
|
||||
}
|
||||
|
||||
// not interested in the others
|
||||
default:
|
||||
continue // not relevant
|
||||
|
||||
}
|
||||
|
||||
stepFirstNotStaleCandidates[key] = c
|
||||
}
|
||||
|
||||
ret := &StalenessInfo{
|
||||
Live: noJobId,
|
||||
Stale: []Abstraction{},
|
||||
@ -732,27 +767,60 @@ func listStaleFiltering(abs []Abstraction, mostRecentCursorByFS map[fsAndJobId]*
|
||||
|
||||
for k := range by {
|
||||
l := by[k]
|
||||
// sort descending (highest createtxg first), then cut off
|
||||
sort.Slice(l, func(i, j int) bool {
|
||||
return l[i].GetCreateTXG() > l[j].GetCreateTXG()
|
||||
})
|
||||
|
||||
if k.Type == AbstractionStepHold || k.Type == AbstractionStepBookmark {
|
||||
// all older than the most recent cursor are stale, others are always live
|
||||
mostRecent := mostRecentCursorByFS[k.fsAndJobId]
|
||||
if mostRecent == nil {
|
||||
ret.Live = append(ret.Live, l...)
|
||||
} else {
|
||||
for _, a := range l {
|
||||
if a.GetCreateTXG() > (*mostRecent).GetCreateTXG() {
|
||||
ret.Live = append(ret.Live, a)
|
||||
} else {
|
||||
ret.Stale = append(ret.Stale, a)
|
||||
|
||||
// if we don't have a replication cursor yet, use untilBound = nil
|
||||
// to consider all steps stale (...at first)
|
||||
var untilBound *CreateTXGRangeBound
|
||||
{
|
||||
sfnsc := stepFirstNotStaleCandidates[k.fsAndJobId]
|
||||
|
||||
// if there's a replication cursor, use it as a cutoff between live and stale
|
||||
// if there's none, we are in initial replication and only need to keep
|
||||
// the most recent step hold live, since that's what our initial replication strategy
|
||||
// uses (both initially and on resume)
|
||||
// (FIXME hardcoded replication strategy)
|
||||
if sfnsc.cursor != nil {
|
||||
untilBound = &CreateTXGRangeBound{
|
||||
CreateTXG: (*sfnsc.cursor).GetCreateTXG(),
|
||||
// if we have a cursor, can throw away step hold on both From and To
|
||||
Inclusive: &zfs.NilBool{B: true},
|
||||
}
|
||||
} else if sfnsc.step != nil {
|
||||
untilBound = &CreateTXGRangeBound{
|
||||
CreateTXG: (*sfnsc.step).GetCreateTXG(),
|
||||
// if we don't have a cursor, the step most recent step hold is our
|
||||
// initial replication cursor and it's possibly still live (interrupted initial replication)
|
||||
Inclusive: &zfs.NilBool{B: false},
|
||||
}
|
||||
} else {
|
||||
untilBound = nil // consider everything stale
|
||||
}
|
||||
}
|
||||
staleRange := CreateTXGRange{
|
||||
Since: sinceBound,
|
||||
Until: untilBound,
|
||||
}
|
||||
|
||||
// partition by staleRange
|
||||
for _, a := range l {
|
||||
if staleRange.Contains(a.GetCreateTXG()) {
|
||||
ret.Stale = append(ret.Stale, a)
|
||||
} else {
|
||||
ret.Live = append(ret.Live, a)
|
||||
}
|
||||
}
|
||||
|
||||
} else if k.Type == AbstractionReplicationCursorBookmarkV2 || k.Type == AbstractionLastReceivedHold {
|
||||
// all but the most recent are stale by definition (we always _move_ them)
|
||||
// NOTE: must not use firstNotStale in this branch, not computed for these types
|
||||
|
||||
// sort descending (highest createtxg first), then cut off
|
||||
sort.Slice(l, func(i, j int) bool {
|
||||
return l[i].GetCreateTXG() > l[j].GetCreateTXG()
|
||||
})
|
||||
if len(l) > 0 {
|
||||
ret.Live = append(ret.Live, l[0])
|
||||
ret.Stale = append(ret.Stale, l[1:]...)
|
||||
|
@ -207,13 +207,16 @@ func TryReleaseStepStaleFS(ctx context.Context, fs string, jobID JobID) {
|
||||
},
|
||||
JobID: &jobID,
|
||||
What: AbstractionTypeSet{
|
||||
AbstractionStepHold: true,
|
||||
AbstractionStepBookmark: true,
|
||||
AbstractionStepHold: true,
|
||||
AbstractionStepBookmark: true,
|
||||
AbstractionReplicationCursorBookmarkV2: true,
|
||||
},
|
||||
Concurrency: 1,
|
||||
}
|
||||
staleness, err := ListStale(ctx, q)
|
||||
if err != nil {
|
||||
if _, ok := err.(*ListStaleQueryError); ok {
|
||||
panic(err)
|
||||
} else if err != nil {
|
||||
getLogger(ctx).WithError(err).Error("cannot list stale step holds and bookmarks")
|
||||
return
|
||||
}
|
||||
|
@ -234,6 +234,7 @@ func resolveConflict(conflict error) (path []*pdu.FilesystemVersion, msg string)
|
||||
if noCommonAncestor, ok := conflict.(*ConflictNoCommonAncestor); ok {
|
||||
if len(noCommonAncestor.SortedReceiverVersions) == 0 {
|
||||
// TODO this is hard-coded replication policy: most recent snapshot as source
|
||||
// NOTE: Keep in sync with listStaleFiltering, it depends on this hard-coded assumption
|
||||
var mostRecentSnap *pdu.FilesystemVersion
|
||||
for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- {
|
||||
if noCommonAncestor.SortedSenderVersions[n].Type == pdu.FilesystemVersion_Snapshot {
|
||||
|
Loading…
Reference in New Issue
Block a user