From 030bd7affebe1de3db1824d489fba264273df1f1 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 7 May 2017 12:18:54 +0200 Subject: [PATCH] zfs: implement ZFSSend --- zfs/diff.go | 21 ++++++++++++++ zfs/fork_reader.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++ zfs/zfs.go | 16 +++++++++++ 3 files changed, 109 insertions(+) create mode 100644 zfs/fork_reader.go diff --git a/zfs/diff.go b/zfs/diff.go index b776515..40988d5 100644 --- a/zfs/diff.go +++ b/zfs/diff.go @@ -1,6 +1,7 @@ package zfs import ( + "bytes" "errors" "fmt" "sort" @@ -15,6 +16,18 @@ const ( Snapshot = "snapshot" ) +func (t VersionType) DelimiterChar() string { + switch t { + case Bookmark: + return "#" + case Snapshot: + return "@" + default: + panic(fmt.Sprintf("unexpected VersionType %#v", t)) + } + return "" +} + type FilesystemVersion struct { Type VersionType @@ -29,6 +42,14 @@ type FilesystemVersion struct { CreateTXG uint64 } +func (v FilesystemVersion) ToAbsPath(p DatasetPath) string { + var b bytes.Buffer + b.WriteString(p.ToString()) + b.WriteString(v.Type.DelimiterChar()) + b.WriteString(v.Name) + return b.String() +} + type fsbyCreateTXG []FilesystemVersion func (l fsbyCreateTXG) Len() int { return len(l) } diff --git a/zfs/fork_reader.go b/zfs/fork_reader.go new file mode 100644 index 0000000..988fbbe --- /dev/null +++ b/zfs/fork_reader.go @@ -0,0 +1,72 @@ +package zfs + +import ( + "bytes" + "context" + "io" + "os" + "os/exec" + "sync" +) + +// A ForkReader is an io.Reader for a forked process's stdout. +// It Wait()s for the process to exit and - if it exits with error - returns this exit error +// on subsequent Read()s. +type ForkReader struct { + cancelFunc context.CancelFunc + cmd *exec.Cmd + stdout io.Reader + waitErr error + exitWaitGroup sync.WaitGroup +} + +func NewForkReader(command string, args ...string) (r *ForkReader, err error) { + + r = &ForkReader{} + + var ctx context.Context + ctx, r.cancelFunc = context.WithCancel(context.Background()) + + cmd := exec.CommandContext(ctx, command, args...) + + stderr := bytes.NewBuffer(make([]byte, 0, 1024)) + cmd.Stderr = stderr + + if r.stdout, err = cmd.StdoutPipe(); err != nil { + return + } + + if err = cmd.Start(); err != nil { + return + } + r.exitWaitGroup.Add(1) + + go func() { + defer r.exitWaitGroup.Done() + os.Stderr.WriteString("waiting") + if err := cmd.Wait(); err != nil { + os.Stderr.WriteString(err.Error()) + r.waitErr = ZFSError{ + Stderr: stderr.Bytes(), + WaitErr: err, + } + return + } + os.Stderr.WriteString("exited") + }() + return +} + +func (r *ForkReader) Read(buf []byte) (n int, err error) { + if r.waitErr != nil { + return 0, r.waitErr + } + if n, err = r.stdout.Read(buf); err == io.EOF { + // the command has exited but we need to wait for Wait()ing goroutine to finish + r.exitWaitGroup.Wait() + if r.waitErr != nil { + err = r.waitErr + } + } + return +} diff --git a/zfs/zfs.go b/zfs/zfs.go index 9649fdb..76110a6 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -102,3 +102,19 @@ func ZFSList(properties []string, zfsArgs ...string) (res [][]string, err error) } return } + +func ZFSSend(fs DatasetPath, from, to *FilesystemVersion) (stream io.Reader, err error) { + + args := make([]string, 0) + args = append(args, "send") + + if to == nil { // Initial + args = append(args, from.ToAbsPath(fs)) + } else { + args = append(args, "-i", from.ToAbsPath(fs), to.ToAbsPath(fs)) + } + + stream, err = NewForkReader(ZFS_BINARY, args...) + + return +}