mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-22 06:09:45 +01:00
zfs: implement ZFSSend
This commit is contained in:
parent
1a92717894
commit
030bd7affe
21
zfs/diff.go
21
zfs/diff.go
@ -1,6 +1,7 @@
|
||||
package zfs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
@ -15,6 +16,18 @@ const (
|
||||
Snapshot = "snapshot"
|
||||
)
|
||||
|
||||
func (t VersionType) DelimiterChar() string {
|
||||
switch t {
|
||||
case Bookmark:
|
||||
return "#"
|
||||
case Snapshot:
|
||||
return "@"
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected VersionType %#v", t))
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type FilesystemVersion struct {
|
||||
Type VersionType
|
||||
|
||||
@ -29,6 +42,14 @@ type FilesystemVersion struct {
|
||||
CreateTXG uint64
|
||||
}
|
||||
|
||||
func (v FilesystemVersion) ToAbsPath(p DatasetPath) string {
|
||||
var b bytes.Buffer
|
||||
b.WriteString(p.ToString())
|
||||
b.WriteString(v.Type.DelimiterChar())
|
||||
b.WriteString(v.Name)
|
||||
return b.String()
|
||||
}
|
||||
|
||||
type fsbyCreateTXG []FilesystemVersion
|
||||
|
||||
func (l fsbyCreateTXG) Len() int { return len(l) }
|
||||
|
72
zfs/fork_reader.go
Normal file
72
zfs/fork_reader.go
Normal file
@ -0,0 +1,72 @@
|
||||
package zfs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// A ForkReader is an io.Reader for a forked process's stdout.
|
||||
// It Wait()s for the process to exit and - if it exits with error - returns this exit error
|
||||
// on subsequent Read()s.
|
||||
type ForkReader struct {
|
||||
cancelFunc context.CancelFunc
|
||||
cmd *exec.Cmd
|
||||
stdout io.Reader
|
||||
waitErr error
|
||||
exitWaitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewForkReader(command string, args ...string) (r *ForkReader, err error) {
|
||||
|
||||
r = &ForkReader{}
|
||||
|
||||
var ctx context.Context
|
||||
ctx, r.cancelFunc = context.WithCancel(context.Background())
|
||||
|
||||
cmd := exec.CommandContext(ctx, command, args...)
|
||||
|
||||
stderr := bytes.NewBuffer(make([]byte, 0, 1024))
|
||||
cmd.Stderr = stderr
|
||||
|
||||
if r.stdout, err = cmd.StdoutPipe(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = cmd.Start(); err != nil {
|
||||
return
|
||||
}
|
||||
r.exitWaitGroup.Add(1)
|
||||
|
||||
go func() {
|
||||
defer r.exitWaitGroup.Done()
|
||||
os.Stderr.WriteString("waiting")
|
||||
if err := cmd.Wait(); err != nil {
|
||||
os.Stderr.WriteString(err.Error())
|
||||
r.waitErr = ZFSError{
|
||||
Stderr: stderr.Bytes(),
|
||||
WaitErr: err,
|
||||
}
|
||||
return
|
||||
}
|
||||
os.Stderr.WriteString("exited")
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
func (r *ForkReader) Read(buf []byte) (n int, err error) {
|
||||
if r.waitErr != nil {
|
||||
return 0, r.waitErr
|
||||
}
|
||||
if n, err = r.stdout.Read(buf); err == io.EOF {
|
||||
// the command has exited but we need to wait for Wait()ing goroutine to finish
|
||||
r.exitWaitGroup.Wait()
|
||||
if r.waitErr != nil {
|
||||
err = r.waitErr
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
16
zfs/zfs.go
16
zfs/zfs.go
@ -102,3 +102,19 @@ func ZFSList(properties []string, zfsArgs ...string) (res [][]string, err error)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func ZFSSend(fs DatasetPath, from, to *FilesystemVersion) (stream io.Reader, err error) {
|
||||
|
||||
args := make([]string, 0)
|
||||
args = append(args, "send")
|
||||
|
||||
if to == nil { // Initial
|
||||
args = append(args, from.ToAbsPath(fs))
|
||||
} else {
|
||||
args = append(args, "-i", from.ToAbsPath(fs), to.ToAbsPath(fs))
|
||||
}
|
||||
|
||||
stream, err = NewForkReader(ZFS_BINARY, args...)
|
||||
|
||||
return
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user