mirror of
https://github.com/zrepl/zrepl.git
synced 2025-02-16 10:29:54 +01:00
Gofmt megacommit.
This commit is contained in:
parent
9750bf3123
commit
d9ecfc8eb4
@ -1,29 +1,29 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
yaml "gopkg.in/yaml.v2"
|
"errors"
|
||||||
"io/ioutil"
|
|
||||||
"github.com/mitchellh/mapstructure"
|
"github.com/mitchellh/mapstructure"
|
||||||
"github.com/zrepl/zrepl/zfs"
|
"github.com/zrepl/zrepl/zfs"
|
||||||
"errors"
|
yaml "gopkg.in/yaml.v2"
|
||||||
|
"io/ioutil"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
Name string
|
Name string
|
||||||
Url string
|
Url string
|
||||||
}
|
}
|
||||||
type Push struct {
|
type Push struct {
|
||||||
To string
|
To string
|
||||||
Datasets []zfs.DatasetPath
|
Datasets []zfs.DatasetPath
|
||||||
}
|
}
|
||||||
type Pull struct {
|
type Pull struct {
|
||||||
From string
|
From string
|
||||||
Mapping zfs.DatasetMapping
|
Mapping zfs.DatasetMapping
|
||||||
}
|
}
|
||||||
type Sink struct {
|
type Sink struct {
|
||||||
From string
|
From string
|
||||||
Mapping zfs.DatasetMapping
|
Mapping zfs.DatasetMapping
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@ -74,8 +74,8 @@ func parsePools(v interface{}) (p []Pool, err error) {
|
|||||||
|
|
||||||
func parsePushs(v interface{}) (p []Push, err error) {
|
func parsePushs(v interface{}) (p []Push, err error) {
|
||||||
|
|
||||||
asList := make([]struct{
|
asList := make([]struct {
|
||||||
To string
|
To string
|
||||||
Datasets []string
|
Datasets []string
|
||||||
}, 0)
|
}, 0)
|
||||||
|
|
||||||
@ -87,7 +87,7 @@ func parsePushs(v interface{}) (p []Push, err error) {
|
|||||||
|
|
||||||
for _, e := range asList {
|
for _, e := range asList {
|
||||||
push := Push{
|
push := Push{
|
||||||
To: e.To,
|
To: e.To,
|
||||||
Datasets: make([]zfs.DatasetPath, len(e.Datasets)),
|
Datasets: make([]zfs.DatasetPath, len(e.Datasets)),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,12 +105,11 @@ func parsePushs(v interface{}) (p []Push, err error) {
|
|||||||
|
|
||||||
func parsePulls(v interface{}) (p []Pull, err error) {
|
func parsePulls(v interface{}) (p []Pull, err error) {
|
||||||
|
|
||||||
asList := make([]struct{
|
asList := make([]struct {
|
||||||
From string
|
From string
|
||||||
Mapping map[string]string
|
Mapping map[string]string
|
||||||
}, 0)
|
}, 0)
|
||||||
|
|
||||||
|
|
||||||
if err = mapstructure.Decode(v, &asList); err != nil {
|
if err = mapstructure.Decode(v, &asList); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -134,7 +133,7 @@ func parseSinks(v interface{}) (s []Sink, err error) {
|
|||||||
|
|
||||||
var asList []interface{}
|
var asList []interface{}
|
||||||
var ok bool
|
var ok bool
|
||||||
if asList, ok = v.([]interface{}); !ok {
|
if asList, ok = v.([]interface{}); !ok {
|
||||||
return nil, errors.New("expected list")
|
return nil, errors.New("expected list")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -153,7 +152,7 @@ func parseSinks(v interface{}) (s []Sink, err error) {
|
|||||||
|
|
||||||
func parseSink(v interface{}) (s Sink, err error) {
|
func parseSink(v interface{}) (s Sink, err error) {
|
||||||
t := struct {
|
t := struct {
|
||||||
From string
|
From string
|
||||||
Mapping map[string]string
|
Mapping map[string]string
|
||||||
}{}
|
}{}
|
||||||
if err = mapstructure.Decode(v, &t); err != nil {
|
if err = mapstructure.Decode(v, &t); err != nil {
|
||||||
@ -169,7 +168,7 @@ func parseComboMapping(m map[string]string) (c zfs.ComboMapping, err error) {
|
|||||||
|
|
||||||
c.Mappings = make([]zfs.DatasetMapping, len(m))
|
c.Mappings = make([]zfs.DatasetMapping, len(m))
|
||||||
|
|
||||||
for lhs,rhs := range m {
|
for lhs, rhs := range m {
|
||||||
|
|
||||||
if lhs[0] == '|' {
|
if lhs[0] == '|' {
|
||||||
|
|
||||||
@ -178,7 +177,7 @@ func parseComboMapping(m map[string]string) (c zfs.ComboMapping, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m := zfs.DirectMapping{
|
m := zfs.DirectMapping{
|
||||||
Source: nil,
|
Source: nil,
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.Target, err = zfs.NewDatasetPath(rhs); err != nil {
|
if m.Target, err = zfs.NewDatasetPath(rhs); err != nil {
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSampleConfigFileIsParsedWithoutErrors(t *testing.T) {
|
func TestSampleConfigFileIsParsedWithoutErrors(t *testing.T) {
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
|
||||||
"github.com/zrepl/zrepl/zfs"
|
|
||||||
"github.com/zrepl/zrepl/model"
|
"github.com/zrepl/zrepl/model"
|
||||||
"github.com/zrepl/zrepl/rpc"
|
"github.com/zrepl/zrepl/rpc"
|
||||||
|
"github.com/zrepl/zrepl/zfs"
|
||||||
|
"io"
|
||||||
)
|
)
|
||||||
type Handler struct {}
|
|
||||||
|
type Handler struct{}
|
||||||
|
|
||||||
func (h Handler) HandleFilesystemRequest(r rpc.FilesystemRequest) (roots []model.Filesystem, err error) {
|
func (h Handler) HandleFilesystemRequest(r rpc.FilesystemRequest) (roots []model.Filesystem, err error) {
|
||||||
|
|
||||||
|
40
cmd/main.go
40
cmd/main.go
@ -1,18 +1,18 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/urfave/cli"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"github.com/urfave/cli"
|
||||||
"github.com/zrepl/zrepl/sshbytestream"
|
|
||||||
"github.com/zrepl/zrepl/rpc"
|
"github.com/zrepl/zrepl/rpc"
|
||||||
|
"github.com/zrepl/zrepl/sshbytestream"
|
||||||
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Role uint
|
type Role uint
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ROLE_IPC Role = iota
|
ROLE_IPC Role = iota
|
||||||
ROLE_ACTION Role = iota
|
ROLE_ACTION Role = iota
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -29,7 +29,7 @@ func main() {
|
|||||||
app.Flags = []cli.Flag{
|
app.Flags = []cli.Flag{
|
||||||
cli.StringFlag{Name: "config"},
|
cli.StringFlag{Name: "config"},
|
||||||
}
|
}
|
||||||
app.Before = func (c *cli.Context) (err error) {
|
app.Before = func(c *cli.Context) (err error) {
|
||||||
if !c.GlobalIsSet("config") {
|
if !c.GlobalIsSet("config") {
|
||||||
return errors.New("config flag not set")
|
return errors.New("config flag not set")
|
||||||
}
|
}
|
||||||
@ -40,22 +40,22 @@ func main() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
app.Commands = []cli.Command{
|
app.Commands = []cli.Command{
|
||||||
{
|
{
|
||||||
Name: "sink",
|
Name: "sink",
|
||||||
Aliases: []string{"s"},
|
Aliases: []string{"s"},
|
||||||
Usage: "start in sink mode",
|
Usage: "start in sink mode",
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
cli.StringFlag{Name: "identity"},
|
cli.StringFlag{Name: "identity"},
|
||||||
|
},
|
||||||
|
Action: doSink,
|
||||||
},
|
},
|
||||||
Action: doSink,
|
{
|
||||||
},
|
Name: "run",
|
||||||
{
|
Aliases: []string{"r"},
|
||||||
Name: "run",
|
Usage: "do replication",
|
||||||
Aliases: []string{"r"},
|
Action: doRun,
|
||||||
Usage: "do replication",
|
},
|
||||||
Action: doRun,
|
}
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
app.RunAndExitOnError()
|
app.RunAndExitOnError()
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ package model
|
|||||||
|
|
||||||
type Filesystem struct {
|
type Filesystem struct {
|
||||||
Name string
|
Name string
|
||||||
Parent *Filesystem
|
Parent *Filesystem
|
||||||
Children []Filesystem
|
Children []Filesystem
|
||||||
Snapshots []Snapshot
|
Snapshots []Snapshot
|
||||||
}
|
}
|
||||||
@ -21,9 +21,9 @@ type Pool struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type SSHTransport struct {
|
type SSHTransport struct {
|
||||||
Host string
|
Host string
|
||||||
User string
|
User string
|
||||||
Port uint16
|
Port uint16
|
||||||
TransportOpenCommand []string
|
TransportOpenCommand []string
|
||||||
Options []string
|
Options []string
|
||||||
}
|
}
|
||||||
|
17
rpc/rpc.go
17
rpc/rpc.go
@ -2,12 +2,11 @@ package rpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
|
||||||
|
|
||||||
. "github.com/zrepl/zrepl/model"
|
|
||||||
. "github.com/zrepl/zrepl/util"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
. "github.com/zrepl/zrepl/model"
|
||||||
|
. "github.com/zrepl/zrepl/util"
|
||||||
|
"io"
|
||||||
"reflect"
|
"reflect"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -157,14 +156,14 @@ func respondWithError(conn io.Writer, id ErrorId, err error) error {
|
|||||||
func inferRequestType(v interface{}) (RequestType, error) {
|
func inferRequestType(v interface{}) (RequestType, error) {
|
||||||
switch v.(type) {
|
switch v.(type) {
|
||||||
case ByteStreamRPCProtocolVersionRequest:
|
case ByteStreamRPCProtocolVersionRequest:
|
||||||
return RTProtocolVersionRequest, nil
|
return RTProtocolVersionRequest, nil
|
||||||
case FilesystemRequest:
|
case FilesystemRequest:
|
||||||
return RTFilesystemRequest, nil
|
return RTFilesystemRequest, nil
|
||||||
case InitialTransferRequest:
|
case InitialTransferRequest:
|
||||||
return RTInitialTransferRequest, nil
|
return RTInitialTransferRequest, nil
|
||||||
default:
|
default:
|
||||||
return 0, errors.New(fmt.Sprintf("cannot infer request type for type '%v'",
|
return 0, errors.New(fmt.Sprintf("cannot infer request type for type '%v'",
|
||||||
reflect.TypeOf(v)))
|
reflect.TypeOf(v)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -182,7 +181,7 @@ func (c ByteStreamRPC) sendRequest(v interface{}) (err error) {
|
|||||||
|
|
||||||
h := RequestHeader{
|
h := RequestHeader{
|
||||||
Type: rt,
|
Type: rt,
|
||||||
Id: genUUID(),
|
Id: genUUID(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.encoder.Encode(h); err != nil {
|
if err = c.encoder.Encode(h); err != nil {
|
||||||
|
@ -6,10 +6,10 @@ type RequestId [16]byte
|
|||||||
type RequestType uint8
|
type RequestType uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
RTProtocolVersionRequest RequestType = 1
|
RTProtocolVersionRequest RequestType = 1
|
||||||
RTFilesystemRequest = 16
|
RTFilesystemRequest = 16
|
||||||
RTInitialTransferRequest = 17
|
RTInitialTransferRequest = 17
|
||||||
RTIncrementalTransferRequest = 18
|
RTIncrementalTransferRequest = 18
|
||||||
)
|
)
|
||||||
|
|
||||||
type RequestHeader struct {
|
type RequestHeader struct {
|
||||||
@ -56,9 +56,9 @@ const (
|
|||||||
type ResponseType uint8
|
type ResponseType uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ROK ResponseType = 1
|
ROK ResponseType = 1
|
||||||
RFilesystems = 2
|
RFilesystems = 2
|
||||||
RChunkedStream = 3
|
RChunkedStream = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
type ResponseHeader struct {
|
type ResponseHeader struct {
|
||||||
|
@ -1,17 +1,17 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"flag"
|
||||||
"github.com/zrepl/zrepl/model"
|
"github.com/zrepl/zrepl/model"
|
||||||
"github.com/zrepl/zrepl/sshbytestream"
|
"github.com/zrepl/zrepl/sshbytestream"
|
||||||
"github.com/zrepl/zrepl/util"
|
"github.com/zrepl/zrepl/util"
|
||||||
"flag"
|
|
||||||
// "bytes"
|
// "bytes"
|
||||||
_ "bufio"
|
_ "bufio"
|
||||||
// "strings"
|
// "strings"
|
||||||
"io"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
_ "time"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
_ "time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -45,14 +45,13 @@ func main() {
|
|||||||
|
|
||||||
fmt.Fprintf(os.Stderr, "Chunk Count: %d\n", chunker.ChunkCount)
|
fmt.Fprintf(os.Stderr, "Chunk Count: %d\n", chunker.ChunkCount)
|
||||||
|
|
||||||
|
|
||||||
case *mode == "outgoing":
|
case *mode == "outgoing":
|
||||||
|
|
||||||
conn, err := sshbytestream.Outgoing("client", model.SSHTransport{
|
conn, err := sshbytestream.Outgoing("client", model.SSHTransport{
|
||||||
Host: *outgoingHost,
|
Host: *outgoingHost,
|
||||||
User: *outgoingUser,
|
User: *outgoingUser,
|
||||||
Port: uint16(*outgoingPort),
|
Port: uint16(*outgoingPort),
|
||||||
Options: []string{"Compression=no"},
|
Options: []string{"Compression=no"},
|
||||||
TransportOpenCommand: []string{"/tmp/sshwrap", "-mode", "incoming", "-incoming.file", "/random.img"},
|
TransportOpenCommand: []string{"/tmp/sshwrap", "-mode", "incoming", "-incoming.file", "/random.img"},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -73,10 +72,9 @@ func main() {
|
|||||||
|
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
panic("unsupported mode!")
|
panic("unsupported mode!")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,13 +1,13 @@
|
|||||||
package sshbytestream
|
package sshbytestream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/zrepl/zrepl/model"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"github.com/zrepl/zrepl/model"
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"bytes"
|
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -28,7 +28,7 @@ func (f IncomingReadWriteCloser) Write(p []byte) (n int, err error) {
|
|||||||
return os.Stdout.Write(p)
|
return os.Stdout.Write(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f IncomingReadWriteCloser) Close() (err error) {
|
func (f IncomingReadWriteCloser) Close() (err error) {
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -37,12 +37,12 @@ func Outgoing(name string, remote model.SSHTransport) (conn io.ReadWriteCloser,
|
|||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
sshArgs := make([]string, 0, 2 * len(remote.Options) + len(remote.TransportOpenCommand) + 4)
|
sshArgs := make([]string, 0, 2*len(remote.Options)+len(remote.TransportOpenCommand)+4)
|
||||||
sshArgs = append(sshArgs,
|
sshArgs = append(sshArgs,
|
||||||
"-p", fmt.Sprintf("%d", remote.Port),
|
"-p", fmt.Sprintf("%d", remote.Port),
|
||||||
"-o", "BatchMode=yes",
|
"-o", "BatchMode=yes",
|
||||||
)
|
)
|
||||||
for _,option := range remote.Options {
|
for _, option := range remote.Options {
|
||||||
sshArgs = append(sshArgs, "-o", option)
|
sshArgs = append(sshArgs, "-o", option)
|
||||||
}
|
}
|
||||||
sshArgs = append(sshArgs, fmt.Sprintf("%s@%s", remote.User, remote.Host))
|
sshArgs = append(sshArgs, fmt.Sprintf("%s@%s", remote.User, remote.Host))
|
||||||
@ -57,16 +57,15 @@ func Outgoing(name string, remote model.SSHTransport) (conn io.ReadWriteCloser,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if out,err = cmd.StdoutPipe(); err != nil {
|
if out, err = cmd.StdoutPipe(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
f := ForkedSSHReadWriteCloser{
|
f := ForkedSSHReadWriteCloser{
|
||||||
RemoteStdin: in,
|
RemoteStdin: in,
|
||||||
RemoteStdout: out,
|
RemoteStdout: out,
|
||||||
Cancel: cancel,
|
Cancel: cancel,
|
||||||
Command: cmd,
|
Command: cmd,
|
||||||
exitWaitGroup: &sync.WaitGroup{},
|
exitWaitGroup: &sync.WaitGroup{},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,11 +86,11 @@ func Outgoing(name string, remote model.SSHTransport) (conn io.ReadWriteCloser,
|
|||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type ForkedSSHReadWriteCloser struct {
|
type ForkedSSHReadWriteCloser struct {
|
||||||
RemoteStdin io.Writer
|
RemoteStdin io.Writer
|
||||||
RemoteStdout io.Reader
|
RemoteStdout io.Reader
|
||||||
Command *exec.Cmd
|
Command *exec.Cmd
|
||||||
Cancel context.CancelFunc
|
Cancel context.CancelFunc
|
||||||
exitWaitGroup *sync.WaitGroup
|
exitWaitGroup *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,8 +102,8 @@ func (f ForkedSSHReadWriteCloser) Write(p []byte) (n int, err error) {
|
|||||||
return f.RemoteStdin.Write(p)
|
return f.RemoteStdin.Write(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f ForkedSSHReadWriteCloser) Close() (err error) {
|
func (f ForkedSSHReadWriteCloser) Close() (err error) {
|
||||||
f.Cancel()
|
f.Cancel()
|
||||||
f.exitWaitGroup.Wait()
|
f.exitWaitGroup.Wait()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1,24 +1,24 @@
|
|||||||
package chunking
|
package chunking
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
|
||||||
"encoding/binary"
|
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"io"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ChunkBufSize uint32 = 32*1024
|
var ChunkBufSize uint32 = 32 * 1024
|
||||||
var ChunkHeaderByteOrder = binary.LittleEndian
|
var ChunkHeaderByteOrder = binary.LittleEndian
|
||||||
|
|
||||||
type Unchunker struct {
|
type Unchunker struct {
|
||||||
ChunkCount int
|
ChunkCount int
|
||||||
in io.Reader
|
in io.Reader
|
||||||
remainingChunkBytes uint32
|
remainingChunkBytes uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUnchunker(conn io.Reader) *Unchunker {
|
func NewUnchunker(conn io.Reader) *Unchunker {
|
||||||
return &Unchunker{
|
return &Unchunker{
|
||||||
in: conn,
|
in: conn,
|
||||||
remainingChunkBytes: 0,
|
remainingChunkBytes: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,7 +51,7 @@ func (c *Unchunker) Read(b []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
n, err = c.in.Read(b[0:maxRead])
|
n, err = c.in.Read(b[0:maxRead])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
c.remainingChunkBytes -= uint32(n)
|
c.remainingChunkBytes -= uint32(n)
|
||||||
@ -69,11 +69,11 @@ func min(a, b int) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Chunker struct {
|
type Chunker struct {
|
||||||
ChunkCount int
|
ChunkCount int
|
||||||
in io.Reader
|
in io.Reader
|
||||||
remainingChunkBytes int
|
remainingChunkBytes int
|
||||||
payloadBuf []byte
|
payloadBuf []byte
|
||||||
headerBuf *bytes.Buffer
|
headerBuf *bytes.Buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChunker(conn io.Reader) Chunker {
|
func NewChunker(conn io.Reader) Chunker {
|
||||||
@ -85,11 +85,10 @@ func NewChunkerSized(conn io.Reader, chunkSize uint32) Chunker {
|
|||||||
buf := make([]byte, int(chunkSize)-binary.Size(chunkSize))
|
buf := make([]byte, int(chunkSize)-binary.Size(chunkSize))
|
||||||
|
|
||||||
return Chunker{
|
return Chunker{
|
||||||
in: conn,
|
in: conn,
|
||||||
remainingChunkBytes: 0,
|
remainingChunkBytes: 0,
|
||||||
payloadBuf: buf,
|
payloadBuf: buf,
|
||||||
headerBuf: &bytes.Buffer{},
|
headerBuf: &bytes.Buffer{},
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -113,7 +112,7 @@ func (c *Chunker) Read(b []byte) (n int, err error) {
|
|||||||
|
|
||||||
// Write chunk header
|
// Write chunk header
|
||||||
c.headerBuf.Reset()
|
c.headerBuf.Reset()
|
||||||
nextChunkLen := uint32(newPayloadLen);
|
nextChunkLen := uint32(newPayloadLen)
|
||||||
headerLen := binary.Size(nextChunkLen)
|
headerLen := binary.Size(nextChunkLen)
|
||||||
err = binary.Write(c.headerBuf, ChunkHeaderByteOrder, nextChunkLen)
|
err = binary.Write(c.headerBuf, ChunkHeaderByteOrder, nextChunkLen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -128,5 +127,5 @@ func (c *Chunker) Read(b []byte) (n int, err error) {
|
|||||||
n2 := copy(remainingBuf, c.payloadBuf[:c.remainingChunkBytes])
|
n2 := copy(remainingBuf, c.payloadBuf[:c.remainingChunkBytes])
|
||||||
//fmt.Printf("chunker: written: %d\n", n+int(n2))
|
//fmt.Printf("chunker: written: %d\n", n+int(n2))
|
||||||
c.remainingChunkBytes -= n2
|
c.remainingChunkBytes -= n2
|
||||||
return n+int(n2), err
|
return n + int(n2), err
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,11 @@
|
|||||||
package zfs
|
package zfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"os/exec"
|
|
||||||
"io"
|
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os/exec"
|
||||||
)
|
)
|
||||||
|
|
||||||
type DatasetMapping interface {
|
type DatasetMapping interface {
|
||||||
@ -26,10 +26,9 @@ func (m GlobMapping) Map(source DatasetPath) (target DatasetPath, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
target = make([]string, 0, len(source) + len(m.TargetRoot))
|
target = make([]string, 0, len(source)+len(m.TargetRoot))
|
||||||
target = append(target, m.TargetRoot...)
|
target = append(target, m.TargetRoot...)
|
||||||
|
|
||||||
|
|
||||||
for si, sc := range source {
|
for si, sc := range source {
|
||||||
target = append(target, sc)
|
target = append(target, sc)
|
||||||
if si < len(m.PrefixPath) {
|
if si < len(m.PrefixPath) {
|
||||||
@ -87,7 +86,7 @@ type ExecMapping struct {
|
|||||||
Args []string
|
Args []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewExecMapping(name string, args... string) (m *ExecMapping) {
|
func NewExecMapping(name string, args ...string) (m *ExecMapping) {
|
||||||
m = &ExecMapping{
|
m = &ExecMapping{
|
||||||
Name: name,
|
Name: name,
|
||||||
Args: args,
|
Args: args,
|
||||||
@ -100,7 +99,6 @@ func (m ExecMapping) Map(source DatasetPath) (target DatasetPath, err error) {
|
|||||||
var stdin io.Writer
|
var stdin io.Writer
|
||||||
var stdout io.Reader
|
var stdout io.Reader
|
||||||
|
|
||||||
|
|
||||||
cmd := exec.Command(m.Name, m.Args...)
|
cmd := exec.Command(m.Name, m.Args...)
|
||||||
|
|
||||||
if stdin, err = cmd.StdinPipe(); err != nil {
|
if stdin, err = cmd.StdinPipe(); err != nil {
|
||||||
@ -124,7 +122,7 @@ func (m ExecMapping) Map(source DatasetPath) (target DatasetPath, err error) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if _, err = io.WriteString(stdin, source.ToString() + "\n"); err != nil {
|
if _, err = io.WriteString(stdin, source.ToString()+"\n"); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,8 +134,8 @@ func (m ExecMapping) Map(source DatasetPath) (target DatasetPath, err error) {
|
|||||||
t := resp.Text()
|
t := resp.Text()
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case t == "NOMAP":
|
case t == "NOMAP":
|
||||||
return nil, NoMatchError
|
return nil, NoMatchError
|
||||||
}
|
}
|
||||||
|
|
||||||
target = toDatasetPath(t) // TODO discover garbage?
|
target = toDatasetPath(t) // TODO discover garbage?
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
package zfs
|
package zfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"testing"
|
"testing"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGlobMapping(t *testing.T) {
|
func TestGlobMapping(t *testing.T) {
|
||||||
@ -40,7 +40,7 @@ func TestComboMapping(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
c := ComboMapping{
|
c := ComboMapping{
|
||||||
Mappings: []DatasetMapping{m1,m2},
|
Mappings: []DatasetMapping{m1, m2},
|
||||||
}
|
}
|
||||||
|
|
||||||
var r DatasetPath
|
var r DatasetPath
|
||||||
@ -48,7 +48,7 @@ func TestComboMapping(t *testing.T) {
|
|||||||
|
|
||||||
p := toDatasetPath("a/b/q")
|
p := toDatasetPath("a/b/q")
|
||||||
|
|
||||||
r,err = m2.Map(p)
|
r, err = m2.Map(p)
|
||||||
assert.Equal(t, NoMatchError, err)
|
assert.Equal(t, NoMatchError, err)
|
||||||
|
|
||||||
r, err = c.Map(p)
|
r, err = c.Map(p)
|
||||||
|
18
zfs/zfs.go
18
zfs/zfs.go
@ -1,14 +1,14 @@
|
|||||||
package zfs
|
package zfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/zrepl/zrepl/model"
|
|
||||||
"os/exec"
|
|
||||||
"bufio"
|
"bufio"
|
||||||
"strings"
|
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/zrepl/zrepl/model"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"os/exec"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func InitialSend(snapshot string) (io.Reader, error) {
|
func InitialSend(snapshot string) (io.Reader, error) {
|
||||||
@ -62,9 +62,9 @@ func zfsList(root string, filter DatasetFilter) (datasets []DatasetPath, err err
|
|||||||
const ZFS_LIST_FIELD_COUNT = 1
|
const ZFS_LIST_FIELD_COUNT = 1
|
||||||
|
|
||||||
cmd := exec.Command(ZFS_BINARY, "list", "-H", "-r",
|
cmd := exec.Command(ZFS_BINARY, "list", "-H", "-r",
|
||||||
"-t", "filesystem,volume",
|
"-t", "filesystem,volume",
|
||||||
"-o", "name",
|
"-o", "name",
|
||||||
root)
|
root)
|
||||||
|
|
||||||
var stdout io.Reader
|
var stdout io.Reader
|
||||||
var stderr io.Reader
|
var stderr io.Reader
|
||||||
@ -103,9 +103,9 @@ func zfsList(root string, filter DatasetFilter) (datasets []DatasetPath, err err
|
|||||||
|
|
||||||
stderrOutput, err := ioutil.ReadAll(stderr)
|
stderrOutput, err := ioutil.ReadAll(stderr)
|
||||||
|
|
||||||
if waitErr:= cmd.Wait(); waitErr != nil {
|
if waitErr := cmd.Wait(); waitErr != nil {
|
||||||
err := ZFSError{
|
err := ZFSError{
|
||||||
Stderr: stderrOutput,
|
Stderr: stderrOutput,
|
||||||
WaitErr: waitErr,
|
WaitErr: waitErr,
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
package zfs
|
package zfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestZFSListHandlesProducesZFSErrorOnNonZeroExit(t *testing.T) {
|
func TestZFSListHandlesProducesZFSErrorOnNonZeroExit(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user