zfs: use channel as iterator for ZFSList results

The old approach with ZFSList would keep the two-dimensional array of
lines and their fields in memory (for a short time), which could easily
consume 100s of MiB with > 10000 snapshots / bookmarks (see #34)

fixes #61
This commit is contained in:
Christian Schwarz 2018-02-18 13:28:46 +01:00
parent aa92261ea7
commit 3ba3648f0f
4 changed files with 95 additions and 13 deletions

View File

@ -19,6 +19,8 @@ Developers should consult the git commit log or GitHub issue tracker.
* Make sure to understand the meaning bookmarks have for :ref:`maximum replication downtime <replication-downtime>`.
* Example: :sampleconf:`pullbackup/productionhost.yml`
* |bugfix| :issue:`61`: fix excessive memory usage
0.0.2
-----

View File

@ -1,6 +1,9 @@
package zfs
import "fmt"
import (
"context"
"fmt"
)
type DatasetFilter interface {
Filter(p *DatasetPath) (pass bool, err error)
@ -12,15 +15,21 @@ func ZFSListMapping(filter DatasetFilter) (datasets []*DatasetPath, err error) {
panic("filter must not be nil")
}
var lines [][]string
lines, err = ZFSList([]string{"name"}, "-r", "-t", "filesystem,volume")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rchan := make(chan ZFSListResult)
go ZFSListChan(ctx, rchan, []string{"name"}, "-r", "-t", "filesystem,volume")
datasets = make([]*DatasetPath, 0, len(lines))
datasets = make([]*DatasetPath, 0)
for r := range rchan {
for _, line := range lines {
if r.err != nil {
err = r.err
return
}
var path *DatasetPath
if path, err = NewDatasetPath(line[0]); err != nil {
if path, err = NewDatasetPath(r.fields[0]); err != nil {
return
}

View File

@ -2,6 +2,7 @@ package zfs
import (
"bytes"
"context"
"errors"
"fmt"
"strconv"
@ -61,17 +62,23 @@ type FilesystemVersionFilter interface {
}
func ZFSListFilesystemVersions(fs *DatasetPath, filter FilesystemVersionFilter) (res []FilesystemVersion, err error) {
var fieldLines [][]string
fieldLines, err = ZFSList(
listResults := make(chan ZFSListResult)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go ZFSListChan(ctx, listResults,
[]string{"name", "guid", "createtxg", "creation"},
"-r", "-d", "1",
"-t", "bookmark,snapshot",
"-s", "createtxg", fs.ToString())
if err != nil {
return
}
res = make([]FilesystemVersion, 0, len(fieldLines))
for _, line := range fieldLines {
res = make([]FilesystemVersion, 0)
for listResult := range listResults {
if listResult.err != nil {
return nil, listResult.err
}
line := listResult.fields
if len(line[0]) < 3 {
err = errors.New(fmt.Sprintf("snapshot or bookmark name implausibly short: %s", line[0]))

View File

@ -10,6 +10,8 @@ import (
"os/exec"
"strings"
"context"
"github.com/problame/go-rwccmd"
"github.com/zrepl/zrepl/util"
)
@ -185,6 +187,68 @@ func ZFSList(properties []string, zfsArgs ...string) (res [][]string, err error)
return
}
type ZFSListResult struct {
fields []string
err error
}
// ZFSListChan executes `zfs list` and sends the results to the `out` channel.
// The `out` channel is always closed by ZFSListChan:
// If an error occurs, it is closed after sending a result with the err field set.
// If no error occurs, it is just closed.
// If the operation is cancelled via context, the channel is just closed.
//
// However, if callers do not drain `out` or cancel via `ctx`, the process will leak either running because
// IO is pending or as a zombie.
func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []string, zfsArgs ...string) {
defer close(out)
args := make([]string, 0, 4+len(zfsArgs))
args = append(args,
"list", "-H", "-p",
"-o", strings.Join(properties, ","))
args = append(args, zfsArgs...)
sendResult := func(fields []string, err error) (done bool) {
select {
case <-ctx.Done():
return true
case out <- ZFSListResult{fields, err}:
return false
}
}
cmd, err := rwccmd.CommandContext(ctx, ZFS_BINARY, args, []string{})
if err != nil {
sendResult(nil, err)
return
}
if err = cmd.Start(); err != nil {
sendResult(nil, err)
return
}
defer cmd.Close()
s := bufio.NewScanner(cmd)
buf := make([]byte, 1024) // max line length
s.Buffer(buf, 0)
for s.Scan() {
fields := strings.SplitN(s.Text(), "\t", len(properties))
if len(fields) != len(properties) {
sendResult(nil, errors.New("unexpected output"))
return
}
if sendResult(fields, nil) {
return
}
}
if s.Err() != nil {
sendResult(nil, s.Err())
}
return
}
func ZFSSend(fs *DatasetPath, from, to *FilesystemVersion) (stream io.Reader, err error) {
args := make([]string, 0)