WIP fix racy holds

fixes #280
This commit is contained in:
Christian Schwarz 2020-02-19 22:51:07 +01:00
parent 3ff1966cab
commit 457cbd136b
5 changed files with 73 additions and 23 deletions

View File

@ -411,6 +411,7 @@ func ListZFSHoldsAndBookmarks(ctx context.Context, fsfilter zfs.DatasetFilter) (
for _, fs := range fss { for _, fs := range fss {
err := listZFSHoldsAndBookmarksImplFS(ctx, out, fs) err := listZFSHoldsAndBookmarksImplFS(ctx, out, fs)
if err != nil { if err != nil {
// FIXME if _, ok := err.(*zfs.DatasetDoesNotExist); ok { noop }
return nil, errors.Wrapf(err, "list holds and bookmarks on %q", fs.ToString()) return nil, errors.Wrapf(err, "list holds and bookmarks on %q", fs.ToString())
} }
} }
@ -420,7 +421,8 @@ func ListZFSHoldsAndBookmarks(ctx context.Context, fsfilter zfs.DatasetFilter) (
func listZFSHoldsAndBookmarksImplFS(ctx context.Context, out *ListHoldsAndBookmarksOutput, fs *zfs.DatasetPath) error { func listZFSHoldsAndBookmarksImplFS(ctx context.Context, out *ListHoldsAndBookmarksOutput, fs *zfs.DatasetPath) error {
fsvs, err := zfs.ZFSListFilesystemVersions(fs, nil) fsvs, err := zfs.ZFSListFilesystemVersions(fs, nil)
if err != nil { if err != nil {
return errors.Wrapf(err, "list filesystem versions of %q", fs) // FIXME if _, ok := err.(*zfs.DatasetDoesNotExist); ok { noop }
return errors.Wrapf(err, "list filesystem versions of %q", fs.ToString())
} }
for _, v := range fsvs { for _, v := range fsvs {
switch v.Type { switch v.Type {
@ -429,7 +431,12 @@ func listZFSHoldsAndBookmarksImplFS(ctx context.Context, out *ListHoldsAndBookma
case zfs.Snapshot: case zfs.Snapshot:
holds, err := zfs.ZFSHolds(ctx, fs.ToString(), v.Name) holds, err := zfs.ZFSHolds(ctx, fs.ToString(), v.Name)
if err != nil { if err != nil {
return errors.Wrapf(err, "get holds of %q", v.ToAbsPath(fs)) if _, ok := err.(*zfs.DatasetDoesNotExist); ok {
holds = []string{}
// fallthrough
} else {
return errors.Wrapf(err, "get holds of %q", v.ToAbsPath(fs))
}
} }
for _, tag := range holds { for _, tag := range holds {
listZFSHoldsAndBookmarksImplSnapshotTryParseHold(ctx, out, fs, v, tag) listZFSHoldsAndBookmarksImplSnapshotTryParseHold(ctx, out, fs, v, tag)

View File

@ -3,6 +3,7 @@ package tests
import ( import (
"fmt" "fmt"
"github.com/stretchr/testify/require"
"github.com/zrepl/zrepl/platformtest" "github.com/zrepl/zrepl/platformtest"
"github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/zfs"
) )
@ -13,35 +14,28 @@ func IdempotentHold(ctx *platformtest.Context) {
DESTROYROOT DESTROYROOT
CREATEROOT CREATEROOT
+ "foo bar" + "foo bar"
+ "foo bar@1" + "foo bar@1 2"
`)
defer platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
R zfs release zrepl_platformtest "${ROOTDS}/foo bar@1"
- "foo bar@1"
- "foo bar"
`) `)
fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset) fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset)
v1 := sendArgVersion(fs, "@1") snap := sendArgVersion(fs, "@1 2")
tag := "zrepl_platformtest" tag := "zrepl_platformtest"
err := zfs.ZFSHold(ctx, fs, v1, tag) err := zfs.ZFSHold(ctx, fs, snap, tag)
if err != nil { if err != nil {
panic(err) panic(err)
} }
err = zfs.ZFSHold(ctx, fs, v1, tag) // existing holds
if err != nil { holds, err := zfs.ZFSHolds(ctx, fs, "1 2")
panic(err) require.NoError(ctx, err)
} require.Equal(ctx, []string{tag}, holds)
vnonexistent := zfs.ZFSSendArgVersion{
RelName: "@nonexistent",
GUID: 0xbadf00d,
}
err = zfs.ZFSHold(ctx, fs, vnonexistent, tag)
if err == nil {
panic("still expecting error for nonexistent snapshot")
}
holds, err = zfs.ZFSHolds(ctx, fs, "non existent")
ctx.Logf("holds=%v", holds)
ctx.Logf("errT=%T", err)
ctx.Logf("err=%s", err)
notExist, ok := err.(*zfs.DatasetDoesNotExist)
require.True(ctx, ok)
require.Equal(ctx, fs+"@non existent", notExist.Path)
} }

View File

@ -0,0 +1,47 @@
package tests
import (
"fmt"
"github.com/zrepl/zrepl/platformtest"
"github.com/zrepl/zrepl/zfs"
)
func ListHolds(ctx *platformtest.Context) {
platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
DESTROYROOT
CREATEROOT
+ "foo bar"
+ "foo bar@1"
`)
defer platformtest.Run(ctx, platformtest.PanicErr, ctx.RootDataset, `
R zfs release zrepl_platformtest "${ROOTDS}/foo bar@1"
- "foo bar@1"
- "foo bar"
`)
fs := fmt.Sprintf("%s/foo bar", ctx.RootDataset)
v1 := sendArgVersion(fs, "@1")
tag := "zrepl_platformtest"
err := zfs.ZFSHold(ctx, fs, v1, tag)
if err != nil {
panic(err)
}
err = zfs.ZFSHold(ctx, fs, v1, tag)
if err != nil {
panic(err)
}
vnonexistent := zfs.ZFSSendArgVersion{
RelName: "@nonexistent",
GUID: 0xbadf00d,
}
err = zfs.ZFSHold(ctx, fs, vnonexistent, tag)
if err == nil {
panic("still expecting error for nonexistent snapshot")
}
}

View File

@ -31,4 +31,5 @@ var Cases = []Case{
SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden, SendArgsValidationEncryptedSendOfUnencryptedDatasetForbidden,
SendArgsValidationResumeTokenEncryptionMismatchForbidden, SendArgsValidationResumeTokenEncryptionMismatchForbidden,
SendArgsValidationResumeTokenDifferentFilesystemForbidden, SendArgsValidationResumeTokenDifferentFilesystemForbidden,
ListHolds,
} }

View File

@ -59,6 +59,7 @@ success:
return nil return nil
} }
// If the snapshot does not exist, the returned error is of type *DatasetDoesNotExist
func ZFSHolds(ctx context.Context, fs, snap string) ([]string, error) { func ZFSHolds(ctx context.Context, fs, snap string) ([]string, error) {
if err := validateZFSFilesystem(fs); err != nil { if err := validateZFSFilesystem(fs); err != nil {
return nil, errors.Wrap(err, "`fs` is not a valid filesystem path") return nil, errors.Wrap(err, "`fs` is not a valid filesystem path")