diff --git a/cmd/config.go b/cmd/config.go index a9ccc5d..3e07343 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -1,29 +1,29 @@ package main import ( - yaml "gopkg.in/yaml.v2" - "io/ioutil" + "errors" "github.com/mitchellh/mapstructure" "github.com/zrepl/zrepl/zfs" - "errors" + yaml "gopkg.in/yaml.v2" + "io/ioutil" "strings" ) type Pool struct { - Name string - Url string + Name string + Url string } type Push struct { - To string + To string Datasets []zfs.DatasetPath } type Pull struct { - From string - Mapping zfs.DatasetMapping + From string + Mapping zfs.DatasetMapping } type Sink struct { - From string - Mapping zfs.DatasetMapping + From string + Mapping zfs.DatasetMapping } type Config struct { @@ -74,8 +74,8 @@ func parsePools(v interface{}) (p []Pool, err error) { func parsePushs(v interface{}) (p []Push, err error) { - asList := make([]struct{ - To string + asList := make([]struct { + To string Datasets []string }, 0) @@ -87,7 +87,7 @@ func parsePushs(v interface{}) (p []Push, err error) { for _, e := range asList { push := Push{ - To: e.To, + To: e.To, Datasets: make([]zfs.DatasetPath, len(e.Datasets)), } @@ -105,12 +105,11 @@ func parsePushs(v interface{}) (p []Push, err error) { func parsePulls(v interface{}) (p []Pull, err error) { - asList := make([]struct{ - From string + asList := make([]struct { + From string Mapping map[string]string }, 0) - if err = mapstructure.Decode(v, &asList); err != nil { return } @@ -134,7 +133,7 @@ func parseSinks(v interface{}) (s []Sink, err error) { var asList []interface{} var ok bool - if asList, ok = v.([]interface{}); !ok { + if asList, ok = v.([]interface{}); !ok { return nil, errors.New("expected list") } @@ -153,7 +152,7 @@ func parseSinks(v interface{}) (s []Sink, err error) { func parseSink(v interface{}) (s Sink, err error) { t := struct { - From string + From string Mapping map[string]string }{} if err = mapstructure.Decode(v, &t); err != nil { @@ -169,7 +168,7 @@ func parseComboMapping(m map[string]string) (c zfs.ComboMapping, err error) { c.Mappings = make([]zfs.DatasetMapping, len(m)) - for lhs,rhs := range m { + for lhs, rhs := range m { if lhs[0] == '|' { @@ -178,7 +177,7 @@ func parseComboMapping(m map[string]string) (c zfs.ComboMapping, err error) { } m := zfs.DirectMapping{ - Source: nil, + Source: nil, } if m.Target, err = zfs.NewDatasetPath(rhs); err != nil { diff --git a/cmd/config_test.go b/cmd/config_test.go index cf68dac..8d0a303 100644 --- a/cmd/config_test.go +++ b/cmd/config_test.go @@ -1,8 +1,8 @@ package main import ( - "testing" "github.com/stretchr/testify/assert" + "testing" ) func TestSampleConfigFileIsParsedWithoutErrors(t *testing.T) { diff --git a/cmd/handler.go b/cmd/handler.go index ea89f22..5c9e958 100644 --- a/cmd/handler.go +++ b/cmd/handler.go @@ -1,12 +1,13 @@ package main import ( - "io" - "github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/model" "github.com/zrepl/zrepl/rpc" + "github.com/zrepl/zrepl/zfs" + "io" ) -type Handler struct {} + +type Handler struct{} func (h Handler) HandleFilesystemRequest(r rpc.FilesystemRequest) (roots []model.Filesystem, err error) { diff --git a/cmd/main.go b/cmd/main.go index c04ab42..4e4da4d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,18 +1,18 @@ package main import ( - "github.com/urfave/cli" "errors" "fmt" - "io" - "github.com/zrepl/zrepl/sshbytestream" + "github.com/urfave/cli" "github.com/zrepl/zrepl/rpc" + "github.com/zrepl/zrepl/sshbytestream" + "io" ) type Role uint const ( - ROLE_IPC Role = iota + ROLE_IPC Role = iota ROLE_ACTION Role = iota ) @@ -29,7 +29,7 @@ func main() { app.Flags = []cli.Flag{ cli.StringFlag{Name: "config"}, } - app.Before = func (c *cli.Context) (err error) { + app.Before = func(c *cli.Context) (err error) { if !c.GlobalIsSet("config") { return errors.New("config flag not set") } @@ -40,22 +40,22 @@ func main() { return } app.Commands = []cli.Command{ - { - Name: "sink", - Aliases: []string{"s"}, - Usage: "start in sink mode", - Flags: []cli.Flag{ - cli.StringFlag{Name: "identity"}, + { + Name: "sink", + Aliases: []string{"s"}, + Usage: "start in sink mode", + Flags: []cli.Flag{ + cli.StringFlag{Name: "identity"}, + }, + Action: doSink, }, - Action: doSink, - }, - { - Name: "run", - Aliases: []string{"r"}, - Usage: "do replication", - Action: doRun, - }, -} + { + Name: "run", + Aliases: []string{"r"}, + Usage: "do replication", + Action: doRun, + }, + } app.RunAndExitOnError() diff --git a/model/model.go b/model/model.go index 399a12b..661b8e9 100644 --- a/model/model.go +++ b/model/model.go @@ -2,7 +2,7 @@ package model type Filesystem struct { Name string - Parent *Filesystem + Parent *Filesystem Children []Filesystem Snapshots []Snapshot } @@ -21,9 +21,9 @@ type Pool struct { } type SSHTransport struct { - Host string - User string - Port uint16 + Host string + User string + Port uint16 TransportOpenCommand []string - Options []string + Options []string } diff --git a/rpc/rpc.go b/rpc/rpc.go index 3c17d94..1e3da03 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -2,12 +2,11 @@ package rpc import ( "encoding/json" - "io" - - . "github.com/zrepl/zrepl/model" - . "github.com/zrepl/zrepl/util" "errors" "fmt" + . "github.com/zrepl/zrepl/model" + . "github.com/zrepl/zrepl/util" + "io" "reflect" ) @@ -157,14 +156,14 @@ func respondWithError(conn io.Writer, id ErrorId, err error) error { func inferRequestType(v interface{}) (RequestType, error) { switch v.(type) { case ByteStreamRPCProtocolVersionRequest: - return RTProtocolVersionRequest, nil + return RTProtocolVersionRequest, nil case FilesystemRequest: - return RTFilesystemRequest, nil + return RTFilesystemRequest, nil case InitialTransferRequest: - return RTInitialTransferRequest, nil + return RTInitialTransferRequest, nil default: return 0, errors.New(fmt.Sprintf("cannot infer request type for type '%v'", - reflect.TypeOf(v))) + reflect.TypeOf(v))) } } @@ -182,7 +181,7 @@ func (c ByteStreamRPC) sendRequest(v interface{}) (err error) { h := RequestHeader{ Type: rt, - Id: genUUID(), + Id: genUUID(), } if err = c.encoder.Encode(h); err != nil { diff --git a/rpc/structs.go b/rpc/structs.go index aa4cab2..30b4edb 100644 --- a/rpc/structs.go +++ b/rpc/structs.go @@ -6,10 +6,10 @@ type RequestId [16]byte type RequestType uint8 const ( - RTProtocolVersionRequest RequestType = 1 - RTFilesystemRequest = 16 - RTInitialTransferRequest = 17 - RTIncrementalTransferRequest = 18 + RTProtocolVersionRequest RequestType = 1 + RTFilesystemRequest = 16 + RTInitialTransferRequest = 17 + RTIncrementalTransferRequest = 18 ) type RequestHeader struct { @@ -56,9 +56,9 @@ const ( type ResponseType uint8 const ( - ROK ResponseType = 1 - RFilesystems = 2 - RChunkedStream = 3 + ROK ResponseType = 1 + RFilesystems = 2 + RChunkedStream = 3 ) type ResponseHeader struct { diff --git a/scratchpad/chunker/main.go b/scratchpad/chunker/main.go index def0518..bee7432 100644 --- a/scratchpad/chunker/main.go +++ b/scratchpad/chunker/main.go @@ -1,17 +1,17 @@ package main import ( + "flag" "github.com/zrepl/zrepl/model" "github.com/zrepl/zrepl/sshbytestream" "github.com/zrepl/zrepl/util" - "flag" // "bytes" _ "bufio" // "strings" - "io" "fmt" - _ "time" + "io" "os" + _ "time" ) func main() { @@ -45,14 +45,13 @@ func main() { fmt.Fprintf(os.Stderr, "Chunk Count: %d\n", chunker.ChunkCount) - case *mode == "outgoing": conn, err := sshbytestream.Outgoing("client", model.SSHTransport{ - Host: *outgoingHost, - User: *outgoingUser, - Port: uint16(*outgoingPort), - Options: []string{"Compression=no"}, + Host: *outgoingHost, + User: *outgoingUser, + Port: uint16(*outgoingPort), + Options: []string{"Compression=no"}, TransportOpenCommand: []string{"/tmp/sshwrap", "-mode", "incoming", "-incoming.file", "/random.img"}, }) @@ -73,10 +72,9 @@ func main() { os.Exit(0) - default: panic("unsupported mode!") } -} \ No newline at end of file +} diff --git a/sshbytestream/ssh.go b/sshbytestream/ssh.go index 3023969..d8a7cfa 100644 --- a/sshbytestream/ssh.go +++ b/sshbytestream/ssh.go @@ -1,13 +1,13 @@ package sshbytestream import ( + "bytes" + "context" + "fmt" + "github.com/zrepl/zrepl/model" "io" "os" "os/exec" - "github.com/zrepl/zrepl/model" - "context" - "fmt" - "bytes" "sync" ) @@ -28,7 +28,7 @@ func (f IncomingReadWriteCloser) Write(p []byte) (n int, err error) { return os.Stdout.Write(p) } -func (f IncomingReadWriteCloser) Close() (err error) { +func (f IncomingReadWriteCloser) Close() (err error) { os.Exit(0) return nil } @@ -37,12 +37,12 @@ func Outgoing(name string, remote model.SSHTransport) (conn io.ReadWriteCloser, ctx, cancel := context.WithCancel(context.Background()) - sshArgs := make([]string, 0, 2 * len(remote.Options) + len(remote.TransportOpenCommand) + 4) + sshArgs := make([]string, 0, 2*len(remote.Options)+len(remote.TransportOpenCommand)+4) sshArgs = append(sshArgs, "-p", fmt.Sprintf("%d", remote.Port), "-o", "BatchMode=yes", ) - for _,option := range remote.Options { + for _, option := range remote.Options { sshArgs = append(sshArgs, "-o", option) } sshArgs = append(sshArgs, fmt.Sprintf("%s@%s", remote.User, remote.Host)) @@ -57,16 +57,15 @@ func Outgoing(name string, remote model.SSHTransport) (conn io.ReadWriteCloser, return } - if out,err = cmd.StdoutPipe(); err != nil { + if out, err = cmd.StdoutPipe(); err != nil { return } - f := ForkedSSHReadWriteCloser{ - RemoteStdin: in, - RemoteStdout: out, - Cancel: cancel, - Command: cmd, + RemoteStdin: in, + RemoteStdout: out, + Cancel: cancel, + Command: cmd, exitWaitGroup: &sync.WaitGroup{}, } @@ -87,11 +86,11 @@ func Outgoing(name string, remote model.SSHTransport) (conn io.ReadWriteCloser, return f, nil } -type ForkedSSHReadWriteCloser struct { - RemoteStdin io.Writer - RemoteStdout io.Reader - Command *exec.Cmd - Cancel context.CancelFunc +type ForkedSSHReadWriteCloser struct { + RemoteStdin io.Writer + RemoteStdout io.Reader + Command *exec.Cmd + Cancel context.CancelFunc exitWaitGroup *sync.WaitGroup } @@ -103,8 +102,8 @@ func (f ForkedSSHReadWriteCloser) Write(p []byte) (n int, err error) { return f.RemoteStdin.Write(p) } -func (f ForkedSSHReadWriteCloser) Close() (err error) { +func (f ForkedSSHReadWriteCloser) Close() (err error) { f.Cancel() f.exitWaitGroup.Wait() return nil -} \ No newline at end of file +} diff --git a/util/chunking.go b/util/chunking.go index 2ba9cb5..2ef501f 100644 --- a/util/chunking.go +++ b/util/chunking.go @@ -1,24 +1,24 @@ package chunking import ( - "io" - "encoding/binary" "bytes" + "encoding/binary" + "io" ) -var ChunkBufSize uint32 = 32*1024 +var ChunkBufSize uint32 = 32 * 1024 var ChunkHeaderByteOrder = binary.LittleEndian type Unchunker struct { - ChunkCount int - in io.Reader + ChunkCount int + in io.Reader remainingChunkBytes uint32 } func NewUnchunker(conn io.Reader) *Unchunker { return &Unchunker{ - in: conn, - remainingChunkBytes: 0, + in: conn, + remainingChunkBytes: 0, } } @@ -51,7 +51,7 @@ func (c *Unchunker) Read(b []byte) (n int, err error) { } n, err = c.in.Read(b[0:maxRead]) - if err != nil { + if err != nil { return n, err } c.remainingChunkBytes -= uint32(n) @@ -69,11 +69,11 @@ func min(a, b int) int { } type Chunker struct { - ChunkCount int - in io.Reader + ChunkCount int + in io.Reader remainingChunkBytes int - payloadBuf []byte - headerBuf *bytes.Buffer + payloadBuf []byte + headerBuf *bytes.Buffer } func NewChunker(conn io.Reader) Chunker { @@ -85,11 +85,10 @@ func NewChunkerSized(conn io.Reader, chunkSize uint32) Chunker { buf := make([]byte, int(chunkSize)-binary.Size(chunkSize)) return Chunker{ - in: conn, + in: conn, remainingChunkBytes: 0, - payloadBuf: buf, - headerBuf: &bytes.Buffer{}, - + payloadBuf: buf, + headerBuf: &bytes.Buffer{}, } } @@ -113,7 +112,7 @@ func (c *Chunker) Read(b []byte) (n int, err error) { // Write chunk header c.headerBuf.Reset() - nextChunkLen := uint32(newPayloadLen); + nextChunkLen := uint32(newPayloadLen) headerLen := binary.Size(nextChunkLen) err = binary.Write(c.headerBuf, ChunkHeaderByteOrder, nextChunkLen) if err != nil { @@ -128,5 +127,5 @@ func (c *Chunker) Read(b []byte) (n int, err error) { n2 := copy(remainingBuf, c.payloadBuf[:c.remainingChunkBytes]) //fmt.Printf("chunker: written: %d\n", n+int(n2)) c.remainingChunkBytes -= n2 - return n+int(n2), err -} \ No newline at end of file + return n + int(n2), err +} diff --git a/zfs/mapping.go b/zfs/mapping.go index 644f3c9..c73f9a0 100644 --- a/zfs/mapping.go +++ b/zfs/mapping.go @@ -1,11 +1,11 @@ package zfs import ( - "errors" - "os/exec" - "io" "bufio" + "errors" "fmt" + "io" + "os/exec" ) type DatasetMapping interface { @@ -26,10 +26,9 @@ func (m GlobMapping) Map(source DatasetPath) (target DatasetPath, err error) { return } - target = make([]string, 0, len(source) + len(m.TargetRoot)) + target = make([]string, 0, len(source)+len(m.TargetRoot)) target = append(target, m.TargetRoot...) - for si, sc := range source { target = append(target, sc) if si < len(m.PrefixPath) { @@ -87,7 +86,7 @@ type ExecMapping struct { Args []string } -func NewExecMapping(name string, args... string) (m *ExecMapping) { +func NewExecMapping(name string, args ...string) (m *ExecMapping) { m = &ExecMapping{ Name: name, Args: args, @@ -100,7 +99,6 @@ func (m ExecMapping) Map(source DatasetPath) (target DatasetPath, err error) { var stdin io.Writer var stdout io.Reader - cmd := exec.Command(m.Name, m.Args...) if stdin, err = cmd.StdinPipe(); err != nil { @@ -124,7 +122,7 @@ func (m ExecMapping) Map(source DatasetPath) (target DatasetPath, err error) { } }() - if _, err = io.WriteString(stdin, source.ToString() + "\n"); err != nil { + if _, err = io.WriteString(stdin, source.ToString()+"\n"); err != nil { return } @@ -136,8 +134,8 @@ func (m ExecMapping) Map(source DatasetPath) (target DatasetPath, err error) { t := resp.Text() switch { - case t == "NOMAP": - return nil, NoMatchError + case t == "NOMAP": + return nil, NoMatchError } target = toDatasetPath(t) // TODO discover garbage? diff --git a/zfs/mapping_test.go b/zfs/mapping_test.go index 3fa0600..a1a741d 100644 --- a/zfs/mapping_test.go +++ b/zfs/mapping_test.go @@ -1,8 +1,8 @@ package zfs import ( + "github.com/stretchr/testify/assert" "testing" - "github.com/stretchr/testify/assert" ) func TestGlobMapping(t *testing.T) { @@ -40,7 +40,7 @@ func TestComboMapping(t *testing.T) { } c := ComboMapping{ - Mappings: []DatasetMapping{m1,m2}, + Mappings: []DatasetMapping{m1, m2}, } var r DatasetPath @@ -48,7 +48,7 @@ func TestComboMapping(t *testing.T) { p := toDatasetPath("a/b/q") - r,err = m2.Map(p) + r, err = m2.Map(p) assert.Equal(t, NoMatchError, err) r, err = c.Map(p) diff --git a/zfs/zfs.go b/zfs/zfs.go index 0bb425d..ad2764e 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -1,14 +1,14 @@ package zfs import ( - "github.com/zrepl/zrepl/model" - "os/exec" "bufio" - "strings" "errors" - "io" "fmt" + "github.com/zrepl/zrepl/model" + "io" "io/ioutil" + "os/exec" + "strings" ) func InitialSend(snapshot string) (io.Reader, error) { @@ -62,9 +62,9 @@ func zfsList(root string, filter DatasetFilter) (datasets []DatasetPath, err err const ZFS_LIST_FIELD_COUNT = 1 cmd := exec.Command(ZFS_BINARY, "list", "-H", "-r", - "-t", "filesystem,volume", - "-o", "name", - root) + "-t", "filesystem,volume", + "-o", "name", + root) var stdout io.Reader var stderr io.Reader @@ -103,9 +103,9 @@ func zfsList(root string, filter DatasetFilter) (datasets []DatasetPath, err err stderrOutput, err := ioutil.ReadAll(stderr) - if waitErr:= cmd.Wait(); waitErr != nil { + if waitErr := cmd.Wait(); waitErr != nil { err := ZFSError{ - Stderr: stderrOutput, + Stderr: stderrOutput, WaitErr: waitErr, } return nil, err diff --git a/zfs/zfs_test.go b/zfs/zfs_test.go index cd659ad..04cd888 100644 --- a/zfs/zfs_test.go +++ b/zfs/zfs_test.go @@ -1,8 +1,8 @@ package zfs import ( - "testing" "github.com/stretchr/testify/assert" + "testing" ) func TestZFSListHandlesProducesZFSErrorOnNonZeroExit(t *testing.T) {