zfs,endpoint: use zfs destroy batch syntax if available

refs #72
This commit is contained in:
Christian Schwarz 2019-08-12 01:19:08 +02:00
parent 77d3a1ad4d
commit 07956c2299
7 changed files with 594 additions and 38 deletions

View File

@ -31,6 +31,7 @@ We use the following annotations for classifying changes:
* Linux ARM64 Docker build support & binary builds * Linux ARM64 Docker build support & binary builds
* Go modules for dependency management both inside and outside of GOPATH * Go modules for dependency management both inside and outside of GOPATH
(``lazy.sh`` and ``Makefile`` force ``GO111MODULE=on``) (``lazy.sh`` and ``Makefile`` force ``GO111MODULE=on``)
* |feature| Use ``zfs destroy pool/fs@snap1,snap2,...`` CLI feature if available
0.1.1 0.1.1
----- -----

View File

@ -454,30 +454,34 @@ func (s *Receiver) DestroySnapshots(ctx context.Context, req *pdu.DestroySnapsho
} }
func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.FilesystemVersion) (*pdu.DestroySnapshotsRes, error) { func doDestroySnapshots(ctx context.Context, lp *zfs.DatasetPath, snaps []*pdu.FilesystemVersion) (*pdu.DestroySnapshotsRes, error) {
fsvs := make([]*zfs.FilesystemVersion, len(snaps)) reqs := make([]*zfs.DestroySnapOp, len(snaps))
ress := make([]*pdu.DestroySnapshotRes, len(snaps))
errs := make([]error, len(snaps))
for i, fsv := range snaps { for i, fsv := range snaps {
if fsv.Type != pdu.FilesystemVersion_Snapshot { if fsv.Type != pdu.FilesystemVersion_Snapshot {
return nil, fmt.Errorf("version %q is not a snapshot", fsv.Name) return nil, fmt.Errorf("version %q is not a snapshot", fsv.Name)
} }
var err error ress[i] = &pdu.DestroySnapshotRes{
fsvs[i], err = fsv.ZFSFilesystemVersion() Snapshot: fsv,
if err != nil { // Error set after batch operation
return nil, err }
reqs[i] = &zfs.DestroySnapOp{
Filesystem: lp.ToString(),
Name: fsv.Name,
ErrOut: &errs[i],
} }
} }
res := &pdu.DestroySnapshotsRes{ zfs.ZFSDestroyFilesystemVersions(reqs)
Results: make([]*pdu.DestroySnapshotRes, len(fsvs)), for i := range reqs {
} if errs[i] != nil {
for i, fsv := range fsvs { if de, ok := errs[i].(*zfs.DestroySnapshotsError); ok && len(de.Reason) == 1 {
err := zfs.ZFSDestroyFilesystemVersion(lp, fsv) ress[i].Error = de.Reason[0]
errMsg := "" } else {
if err != nil { ress[i].Error = errs[i].Error()
errMsg = err.Error()
}
res.Results[i] = &pdu.DestroySnapshotRes{
Snapshot: pdu.FilesystemVersionFromZFS(fsv),
Error: errMsg,
} }
} }
return res, nil }
return &pdu.DestroySnapshotsRes{
Results: ress,
}, nil
} }

View File

@ -25,6 +25,23 @@ func Duration(varname string, def time.Duration) time.Duration {
return d return d
} }
func Int(varname string, def int) int {
if v, ok := cache.Load(varname); ok {
return v.(int)
}
e := os.Getenv(varname)
if e == "" {
return def
}
d64, err := strconv.ParseInt(e, 10, strconv.IntSize)
if err != nil {
panic(err)
}
d := int(d64)
cache.Store(varname, d)
return d
}
func Int64(varname string, def int64) int64 { func Int64(varname string, def int64) int64 {
if v, ok := cache.Load(varname); ok { if v, ok := cache.Load(varname); ok {
return v.(int64) return v.(int64)

View File

@ -154,15 +154,3 @@ func ZFSListFilesystemVersions(fs *DatasetPath, filter FilesystemVersionFilter)
} }
return return
} }
func ZFSDestroyFilesystemVersion(filesystem *DatasetPath, version *FilesystemVersion) (err error) {
datasetPath := version.ToAbsPath(filesystem)
// Sanity check...
if !strings.ContainsAny(datasetPath, "@#") {
return fmt.Errorf("sanity check failed: no @ or # character found in %q", datasetPath)
}
return ZFSDestroy(datasetPath)
}

234
zfs/versions_destroy.go Normal file
View File

@ -0,0 +1,234 @@
package zfs
import (
"context"
"fmt"
"os"
"os/exec"
"sort"
"strings"
"sync"
"syscall"
"github.com/zrepl/zrepl/util/envconst"
)
func ZFSDestroyFilesystemVersion(filesystem *DatasetPath, version *FilesystemVersion) (err error) {
datasetPath := version.ToAbsPath(filesystem)
// Sanity check...
if !strings.ContainsAny(datasetPath, "@#") {
return fmt.Errorf("sanity check failed: no @ or # character found in %q", datasetPath)
}
return ZFSDestroy(datasetPath)
}
var destroyerSingleton = destroyerImpl{}
type DestroySnapOp struct {
Filesystem string
Name string
ErrOut *error
}
func (o *DestroySnapOp) String() string {
return fmt.Sprintf("destroy operation %s@%s", o.Filesystem, o.Name)
}
func ZFSDestroyFilesystemVersions(reqs []*DestroySnapOp) {
doDestroy(context.TODO(), reqs, destroyerSingleton)
}
func setDestroySnapOpErr(b []*DestroySnapOp, err error) {
for _, r := range b {
*r.ErrOut = err
}
}
type destroyer interface {
Destroy(args []string) error
DestroySnapshotsCommaSyntaxSupported() (bool, error)
}
func doDestroy(ctx context.Context, reqs []*DestroySnapOp, e destroyer) {
var validated []*DestroySnapOp
for _, req := range reqs {
// Filesystem and Snapshot should not be empty
// ZFS will generally fail because those are invalid destroy arguments,
// but we'd rather apply defensive programming here (doing destroy after all)
if req.Filesystem == "" {
*req.ErrOut = fmt.Errorf("Filesystem must not be an empty string")
} else if req.Name == "" {
*req.ErrOut = fmt.Errorf("Name must not be an empty string")
} else {
validated = append(validated, req)
}
}
reqs = validated
commaSupported, err := e.DestroySnapshotsCommaSyntaxSupported()
if err != nil {
debug("destroy: comma syntax support detection failed: %s", err)
setDestroySnapOpErr(reqs, err)
return
}
if !commaSupported {
doDestroySeq(ctx, reqs, e)
} else {
doDestroyBatched(ctx, reqs, e)
}
}
func doDestroySeq(ctx context.Context, reqs []*DestroySnapOp, e destroyer) {
for _, r := range reqs {
*r.ErrOut = e.Destroy([]string{fmt.Sprintf("%s@%s", r.Filesystem, r.Name)})
}
}
func doDestroyBatched(ctx context.Context, reqs []*DestroySnapOp, d destroyer) {
perFS := buildBatches(reqs)
for _, fsbatch := range perFS {
doDestroyBatchedRec(ctx, fsbatch, d)
}
}
func buildBatches(reqs []*DestroySnapOp) [][]*DestroySnapOp {
if len(reqs) == 0 {
return nil
}
sorted := make([]*DestroySnapOp, len(reqs))
copy(sorted, reqs)
sort.SliceStable(sorted, func(i, j int) bool {
// by filesystem, then snap name
fscmp := strings.Compare(sorted[i].Filesystem, sorted[j].Filesystem)
if fscmp != 0 {
return fscmp == -1
}
return strings.Compare(sorted[i].Name, sorted[j].Name) == -1
})
// group by fs
var perFS [][]*DestroySnapOp
consumed := 0
for consumed < len(sorted) {
batchConsumedUntil := consumed
for ; batchConsumedUntil < len(sorted) && sorted[batchConsumedUntil].Filesystem == sorted[consumed].Filesystem; batchConsumedUntil++ {
}
perFS = append(perFS, sorted[consumed:batchConsumedUntil])
consumed = batchConsumedUntil
}
return perFS
}
// batch must be on same Filesystem, panics otherwise
func tryBatch(ctx context.Context, batch []*DestroySnapOp, d destroyer) error {
batchFS := batch[0].Filesystem
batchNames := make([]string, len(batch))
for i := range batchNames {
batchNames[i] = batch[i].Name
if batchFS != batch[i].Filesystem {
panic("inconsistent batch")
}
}
batchArg := fmt.Sprintf("%s@%s", batchFS, strings.Join(batchNames, ","))
return d.Destroy([]string{batchArg})
}
// fsbatch must be on same filesystem
func doDestroyBatchedRec(ctx context.Context, fsbatch []*DestroySnapOp, d destroyer) {
if len(fsbatch) <= 1 {
doDestroySeq(ctx, fsbatch, d)
return
}
err := tryBatch(ctx, fsbatch, d)
if err == nil {
setDestroySnapOpErr(fsbatch, nil)
return
}
if pe, ok := err.(*os.PathError); ok && pe.Err == syscall.E2BIG {
// see TestExcessiveArgumentsResultInE2BIG
// try halving batch size, assuming snapshots names are roughly the same length
debug("batch destroy: E2BIG encountered: %s", err)
doDestroyBatchedRec(ctx, fsbatch[0:len(fsbatch)/2], d)
doDestroyBatchedRec(ctx, fsbatch[len(fsbatch)/2:], d)
return
}
singleRun := fsbatch // the destroys that will be tried sequentially after "smart" error handling below
if err, ok := err.(*DestroySnapshotsError); ok {
// eliminate undestroyable datasets from batch and try it once again
strippedBatch, remaining := make([]*DestroySnapOp, 0, len(fsbatch)), make([]*DestroySnapOp, 0, len(fsbatch))
for _, b := range fsbatch {
isUndestroyable := false
for _, undestroyable := range err.Undestroyable {
if undestroyable == b.Name {
isUndestroyable = true
break
}
}
if isUndestroyable {
remaining = append(remaining, b)
} else {
strippedBatch = append(strippedBatch, b)
}
}
err := tryBatch(ctx, strippedBatch, d)
if err != nil {
// run entire batch sequentially if the stripped one fails
// (it shouldn't because we stripped erronous datasets)
singleRun = fsbatch // shadow
} else {
setDestroySnapOpErr(strippedBatch, nil) // these ones worked
singleRun = remaining // shadow
}
// fallthrough
}
doDestroySeq(ctx, singleRun, d)
}
type destroyerImpl struct{}
func (d destroyerImpl) Destroy(args []string) error {
if len(args) != 1 {
// we have no use case for this at the moment, so let's crash (safer than destroying something unexpectedly)
panic(fmt.Sprintf("unexpected number of arguments: %v", args))
}
// we know that we are only using this for snapshots, so also sanity check for an @ in args[0]
if !strings.ContainsAny(args[0], "@") {
panic(fmt.Sprintf("sanity check: expecting '@' in call to Destroy, got %q", args[0]))
}
return ZFSDestroy(args[0])
}
var batchDestroyFeatureCheck struct {
once sync.Once
enable bool
err error
}
func (d destroyerImpl) DestroySnapshotsCommaSyntaxSupported() (bool, error) {
batchDestroyFeatureCheck.once.Do(func() {
// "feature discovery"
cmd := exec.Command("zfs", "destroy")
output, err := cmd.CombinedOutput()
if _, ok := err.(*exec.ExitError); !ok {
debug("destroy feature check failed: %T %s", err, err)
batchDestroyFeatureCheck.err = err
}
def := strings.Contains(string(output), "<filesystem|volume>@<snap>[%<snap>][,...]")
batchDestroyFeatureCheck.enable = envconst.Bool("ZREPL_EXPERIMENTAL_ZFS_COMMA_SYNTAX_SUPPORTED", def)
debug("destroy feature check complete %#v", &batchDestroyFeatureCheck)
})
return batchDestroyFeatureCheck.enable, batchDestroyFeatureCheck.err
}

View File

@ -0,0 +1,248 @@
package zfs
import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"strings"
"syscall"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zrepl/zrepl/util/chainlock"
)
type mockBatchDestroy struct {
mtx chainlock.L
calls []string
commaUnsupported bool
undestroyable string
randomerror string
e2biglen int
}
func (m *mockBatchDestroy) DestroySnapshotsCommaSyntaxSupported() (bool, error) {
return !m.commaUnsupported, nil
}
func (m *mockBatchDestroy) Destroy(args []string) error {
defer m.mtx.Lock().Unlock()
if len(args) != 1 {
panic("unexpected use of Destroy")
}
a := args[0]
if m.e2biglen > 0 && len(a) > m.e2biglen {
return &os.PathError{Err: syscall.E2BIG} // TestExcessiveArgumentsResultInE2BIG checks that this errors is produced
}
m.calls = append(m.calls, a)
if m.commaUnsupported {
if strings.Contains(a, ",") {
return fmt.Errorf("unsupported syntax mock error")
}
}
if m.undestroyable != "" && strings.Contains(a, m.undestroyable) {
return &DestroySnapshotsError{
Filesystem: "PLACEHOLDER",
Undestroyable: []string{m.undestroyable},
Reason: []string{"undestroyable reason"},
}
}
if m.randomerror != "" && strings.Contains(a, m.randomerror) {
return fmt.Errorf("randomerror")
}
return nil
}
func TestBatchDestroySnaps(t *testing.T) {
errs := make([]error, 10)
nilErrs := func() {
for i := range errs {
errs[i] = nil
}
}
opsTemplate := []*DestroySnapOp{
&DestroySnapOp{"zroot/z", "foo", &errs[0]},
&DestroySnapOp{"zroot/a", "foo", &errs[1]},
&DestroySnapOp{"zroot/a", "bar", &errs[2]},
&DestroySnapOp{"zroot/b", "bar", &errs[3]},
&DestroySnapOp{"zroot/b", "zab", &errs[4]},
&DestroySnapOp{"zroot/b", "undestroyable", &errs[5]},
&DestroySnapOp{"zroot/c", "baz", &errs[6]},
&DestroySnapOp{"zroot/c", "randomerror", &errs[7]},
&DestroySnapOp{"zroot/c", "bar", &errs[8]},
&DestroySnapOp{"zroot/d", "blup", &errs[9]},
}
t.Run("single_undestroyable_dataset", func(t *testing.T) {
nilErrs()
mock := &mockBatchDestroy{
commaUnsupported: false,
undestroyable: "undestroyable",
randomerror: "randomerror",
}
doDestroy(context.TODO(), opsTemplate, mock)
assert.NoError(t, errs[0])
assert.NoError(t, errs[1])
assert.NoError(t, errs[2])
assert.NoError(t, errs[3])
assert.NoError(t, errs[4])
assert.Error(t, errs[5], "undestroyable")
assert.NoError(t, errs[6])
assert.Error(t, errs[7], "randomerror")
assert.NoError(t, errs[8])
assert.NoError(t, errs[9])
defer mock.mtx.Lock().Unlock()
assert.Equal(
t,
[]string{
"zroot/a@bar,foo", // reordered snaps in lexicographical order
"zroot/b@bar,undestroyable,zab",
"zroot/b@bar,zab", // eliminate undestroyables, try others again
"zroot/b@undestroyable",
"zroot/c@bar,baz,randomerror",
"zroot/c@bar", // fallback to single-snapshot on non DestroyError
"zroot/c@baz",
"zroot/c@randomerror",
"zroot/d@blup",
"zroot/z@foo", // ordered at last position
},
mock.calls,
)
})
t.Run("comma_syntax_unsupported", func(t *testing.T) {
nilErrs()
mock := &mockBatchDestroy{
commaUnsupported: true,
undestroyable: "undestroyable",
randomerror: "randomerror",
}
doDestroy(context.TODO(), opsTemplate, mock)
assert.NoError(t, errs[0])
assert.NoError(t, errs[1])
assert.NoError(t, errs[2])
assert.NoError(t, errs[3])
assert.NoError(t, errs[4])
assert.Error(t, errs[5], "undestroyable")
assert.NoError(t, errs[6])
assert.Error(t, errs[7], "randomerror")
assert.NoError(t, errs[8])
assert.NoError(t, errs[9])
defer mock.mtx.Lock().Unlock()
assert.Equal(
t,
[]string{
// expect exactly argument order
"zroot/z@foo",
"zroot/a@foo",
"zroot/a@bar",
"zroot/b@bar",
"zroot/b@zab",
"zroot/b@undestroyable",
"zroot/c@baz",
"zroot/c@randomerror",
"zroot/c@bar",
"zroot/d@blup",
},
mock.calls,
)
})
t.Run("empty_ops", func(t *testing.T) {
mock := &mockBatchDestroy{}
doDestroy(context.TODO(), nil, mock)
defer mock.mtx.Lock().Unlock()
assert.Empty(t, mock.calls)
})
t.Run("ops_without_snapnames", func(t *testing.T) {
mock := &mockBatchDestroy{}
var err error
ops := []*DestroySnapOp{&DestroySnapOp{"somefs", "", &err}}
doDestroy(context.TODO(), ops, mock)
assert.Error(t, err)
defer mock.mtx.Lock().Unlock()
assert.Empty(t, mock.calls)
})
t.Run("ops_without_fsnames", func(t *testing.T) {
mock := &mockBatchDestroy{}
var err error
ops := []*DestroySnapOp{&DestroySnapOp{"", "fsname", &err}}
doDestroy(context.TODO(), ops, mock)
assert.Error(t, err)
defer mock.mtx.Lock().Unlock()
assert.Empty(t, mock.calls)
})
t.Run("splits_up_batches_at_e2big", func(t *testing.T) {
mock := &mockBatchDestroy{
e2biglen: 10,
}
var dummy error
reqs := []*DestroySnapOp{
// should fit (1111@a,b,c)
&DestroySnapOp{"1111", "a", &dummy},
&DestroySnapOp{"1111", "b", &dummy},
&DestroySnapOp{"1111", "c", &dummy},
// should split
&DestroySnapOp{"2222", "01", &dummy},
&DestroySnapOp{"2222", "02", &dummy},
&DestroySnapOp{"2222", "03", &dummy},
&DestroySnapOp{"2222", "04", &dummy},
&DestroySnapOp{"2222", "05", &dummy},
&DestroySnapOp{"2222", "06", &dummy},
&DestroySnapOp{"2222", "07", &dummy},
&DestroySnapOp{"2222", "08", &dummy},
&DestroySnapOp{"2222", "09", &dummy},
&DestroySnapOp{"2222", "10", &dummy},
}
doDestroy(context.TODO(), reqs, mock)
defer mock.mtx.Lock().Unlock()
assert.Equal(
t,
[]string{
"1111@a,b,c",
"2222@01,02",
"2222@03",
"2222@04,05",
"2222@06,07",
"2222@08",
"2222@09,10",
},
mock.calls,
)
})
}
func TestExcessiveArgumentsResultInE2BIG(t *testing.T) {
// FIXME dynamic value
const maxArgumentLength = 1 << 20 // higher than any OS we know, should always fail
t.Logf("maxArgumentLength=%v", maxArgumentLength)
maxArg := bytes.Repeat([]byte("a"), maxArgumentLength)
cmd := exec.Command("/bin/sh", "-c", "echo -n $1; echo -n $2", "cmdname", string(maxArg), string(maxArg))
output, err := cmd.CombinedOutput()
if pe, ok := err.(*os.PathError); ok && pe.Err == syscall.E2BIG {
t.Logf("ok, system returns E2BIG")
} else {
t.Errorf("system did not return E2BIG, but err=%T: %v ", err, err)
t.Logf("output:\n%s", output)
}
}

View File

@ -1020,29 +1020,89 @@ func zfsGet(path string, props []string, allowedSources zfsPropertySource) (*ZFS
return res, nil return res, nil
} }
func ZFSDestroy(dataset string) (err error) { type DestroySnapshotsError struct {
RawLines []string
Filesystem string
Undestroyable []string // snapshot name only (filesystem@ stripped)
Reason []string
}
func (e *DestroySnapshotsError) Error() string {
if len(e.Undestroyable) != len(e.Reason) {
panic(fmt.Sprintf("%v != %v", len(e.Undestroyable), len(e.Reason)))
}
if len(e.Undestroyable) == 0 {
panic(fmt.Sprintf("error must have one undestroyable snapshot, %q", e.Filesystem))
}
if len(e.Undestroyable) == 1 {
return fmt.Sprintf("zfs destroy failed: %s@%s: %s", e.Filesystem, e.Undestroyable[0], e.Reason[0])
}
return strings.Join(e.RawLines, "\n")
}
var destroySnapshotsErrorRegexp = regexp.MustCompile(`^cannot destroy snapshot ([^@]+)@(.+): (.*)$`) // yes, datasets can contain `:`
func tryParseDestroySnapshotsError(arg string, stderr []byte) *DestroySnapshotsError {
argComps := strings.SplitN(arg, "@", 2)
if len(argComps) != 2 {
return nil
}
filesystem := argComps[0]
lines := bufio.NewScanner(bytes.NewReader(stderr))
undestroyable := []string{}
reason := []string{}
rawLines := []string{}
for lines.Scan() {
line := lines.Text()
rawLines = append(rawLines, line)
m := destroySnapshotsErrorRegexp.FindStringSubmatch(line)
if m == nil {
return nil // unexpected line => be conservative
} else {
if m[1] != filesystem {
return nil // unexpected line => be conservative
}
undestroyable = append(undestroyable, m[2])
reason = append(reason, m[3])
}
}
if len(undestroyable) == 0 {
return nil
}
return &DestroySnapshotsError{
RawLines: rawLines,
Filesystem: filesystem,
Undestroyable: undestroyable,
Reason: reason,
}
}
func ZFSDestroy(arg string) (err error) {
var dstype, filesystem string var dstype, filesystem string
idx := strings.IndexAny(dataset, "@#") idx := strings.IndexAny(arg, "@#")
if idx == -1 { if idx == -1 {
dstype = "filesystem" dstype = "filesystem"
filesystem = dataset filesystem = arg
} else { } else {
switch dataset[idx] { switch arg[idx] {
case '@': case '@':
dstype = "snapshot" dstype = "snapshot"
case '#': case '#':
dstype = "bookmark" dstype = "bookmark"
} }
filesystem = dataset[:idx] filesystem = arg[:idx]
} }
defer prometheus.NewTimer(prom.ZFSDestroyDuration.WithLabelValues(dstype, filesystem)) defer prometheus.NewTimer(prom.ZFSDestroyDuration.WithLabelValues(dstype, filesystem))
cmd := exec.Command(ZFS_BINARY, "destroy", dataset) cmd := exec.Command(ZFS_BINARY, "destroy", arg)
stderr := bytes.NewBuffer(make([]byte, 0, 1024)) var stderr bytes.Buffer
cmd.Stderr = stderr cmd.Stderr = &stderr
if err = cmd.Start(); err != nil { if err = cmd.Start(); err != nil {
return err return err
@ -1053,6 +1113,10 @@ func ZFSDestroy(dataset string) (err error) {
Stderr: stderr.Bytes(), Stderr: stderr.Bytes(),
WaitErr: err, WaitErr: err,
} }
if dserr := tryParseDestroySnapshotsError(arg, stderr.Bytes()); dserr != nil {
err = dserr
}
} }
return return