From afed7627749bee3726a06f7855a11a82b6769b09 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 22 Mar 2019 19:41:12 +0100 Subject: [PATCH] format source tree using goimports --- Makefile | 3 + build/build.go | 2 + cli/cli.go | 25 +-- client/configcheck.go | 20 +-- client/jsonclient.go | 3 +- client/migrate.go | 1 + client/pprof.go | 5 +- client/signal.go | 5 +- client/status.go | 64 +++---- client/stdinserver.go | 8 +- client/testcmd.go | 16 +- client/version.go | 12 +- config/config.go | 158 ++++++++++-------- config/config_global_test.go | 9 +- config/config_minimal_test.go | 3 +- config/config_snapshotting_test.go | 7 +- config/retentiongrid.go | 6 +- daemon/control.go | 27 +-- daemon/daemon.go | 19 ++- daemon/filters/fsmapfilter.go | 4 +- daemon/filters/fsvfilter.go | 4 +- daemon/job/build_jobs.go | 1 + daemon/job/job.go | 24 +-- daemon/job/snapjob.go | 1 + daemon/logging/logging_formatters.go | 4 +- daemon/logging/logging_outlets.go | 6 +- daemon/main.go | 2 +- daemon/nethelpers/helpers.go | 3 +- daemon/prometheus.go | 9 +- daemon/pruner/pruner.go | 75 +++++---- daemon/pruner/pruner_queue.go | 9 +- daemon/snapper/snapper.go | 42 ++--- daemon/snapper/snapper_all.go | 3 +- endpoint/context.go | 1 + endpoint/endpoint.go | 1 + logger/datastructures.go | 5 +- logger/logger_test.go | 6 +- logger/stderrlogger.go | 2 +- pruning/keep_grid.go | 10 +- pruning/keep_helpers_test.go | 3 +- pruning/keep_last_n.go | 3 +- pruning/keep_last_n_test.go | 3 +- pruning/keep_regex.go | 2 +- pruning/pruning.go | 6 +- pruning/retentiongrid/retentiongrid_test.go | 3 +- replication/driver/replication_driver.go | 5 +- .../driver/replication_driver_debug.go | 2 +- replication/driver/replication_driver_test.go | 1 + replication/logic/diff/diff_test.go | 2 +- replication/logic/pdu/pdu_extras.go | 3 +- replication/logic/pdu/pdu_test.go | 17 +- replication/logic/replication_logic.go | 4 +- rpc/dataconn/dataconn_client.go | 6 +- rpc/dataconn/dataconn_server.go | 1 + rpc/dataconn/frameconn/frameconn.go | 1 + .../frameconn/frameconn_shutdown_fsm.go | 7 +- rpc/dataconn/frameconn/frameconn_test.go | 1 - .../heartbeatconn/heartbeatconn_test.go | 1 + rpc/dataconn/stream/stream_test.go | 1 + .../internal/wireevaluator/wireevaluator.go | 1 + .../wireevaluator/wireevaluator_closewrite.go | 4 +- rpc/dataconn/timeoutconn/timeoutconn_test.go | 1 + .../authlistener_grpc_adaptor.go | 5 +- .../example/pdu/grpcauth.pb.go | 3 +- .../authlistener_netlistener_adaptor.go | 3 +- rpc/rpc_client.go | 3 +- rpc/rpc_doc.go | 1 - rpc/rpc_mux.go | 2 +- rpc/transportmux/transportmux.go | 9 +- rpc/versionhandshake/versionhandshake.go | 16 +- rpc/versionhandshake/versionhandshake_test.go | 14 +- .../versionhandshake_transport_wrappers.go | 9 +- transport/fromconfig/transport_fromconfig.go | 7 +- transport/local/connect_local.go | 4 +- transport/local/serve_local.go | 25 +-- transport/ssh/connect_ssh.go | 4 +- transport/ssh/serve_stdinserver.go | 32 ++-- transport/tcp/serve_tcp.go | 13 +- transport/tls/connect_tls.go | 1 + transport/tls/serve_tls.go | 18 +- .../bytecounter_streamcopier_test.go | 1 + util/chainlock/chainlock.go | 2 +- util/chunking_test.go | 3 +- util/contextflexibletimeout_test.go | 5 +- util/io.go | 10 +- util/iocommand.go | 5 +- zfs/datasetpath_visitor_test.go | 3 +- zfs/mapping.go | 3 +- zfs/replication_history.go | 3 +- zfs/resume_token_test.go | 6 +- zfs/versions.go | 3 +- zfs/zfs.go | 111 ++++++------ zfs/zfs_test.go | 41 ++--- 93 files changed, 585 insertions(+), 463 deletions(-) diff --git a/Makefile b/Makefile index 42b6990..ade61e7 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,9 @@ generate: #not part of the build, must do that manually protoc -I=replication/logic/pdu --go_out=plugins=grpc:replication/logic/pdu replication/logic/pdu/pdu.proto go generate -x ./... +format: + goimports -srcdir . -local 'github.com/zrepl/zrepl' -w $(shell find . -type f -name '*.go' -not -path "./vendor/*") + build: @echo "INFO: In case of missing dependencies, run 'make vendordeps'" $(GO_BUILD) -o "$(ARTIFACTDIR)/zrepl" diff --git a/build/build.go b/build/build.go index ec978e4..8f31bd6 100644 --- a/build/build.go +++ b/build/build.go @@ -11,7 +11,9 @@ package main import ( + "fmt" _ "fmt" + _ "github.com/alvaroloes/enumer" _ "github.com/golang/protobuf/protoc-gen-go" _ "golang.org/x/tools/cmd/stringer" diff --git a/cli/cli.go b/cli/cli.go index 6bfc313..8e2ede0 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -2,10 +2,12 @@ package cli import ( "fmt" + "os" + "github.com/spf13/cobra" "github.com/spf13/pflag" + "github.com/zrepl/zrepl/config" - "os" ) var rootArgs struct { @@ -40,15 +42,15 @@ func init() { } type Subcommand struct { - Use string - Short string - Example string - NoRequireConfig bool - Run func(subcommand *Subcommand, args []string) error - SetupFlags func(f *pflag.FlagSet) - SetupSubcommands func() []*Subcommand + Use string + Short string + Example string + NoRequireConfig bool + Run func(subcommand *Subcommand, args []string) error + SetupFlags func(f *pflag.FlagSet) + SetupSubcommands func() []*Subcommand - config *config.Config + config *config.Config configErr error } @@ -93,8 +95,8 @@ func AddSubcommand(s *Subcommand) { func addSubcommandToCobraCmd(c *cobra.Command, s *Subcommand) { cmd := cobra.Command{ - Use: s.Use, - Short: s.Short, + Use: s.Use, + Short: s.Short, Example: s.Example, } if s.SetupSubcommands == nil { @@ -110,7 +112,6 @@ func addSubcommandToCobraCmd(c *cobra.Command, s *Subcommand) { c.AddCommand(&cmd) } - func Run() { if err := rootCmd.Execute(); err != nil { os.Exit(1) diff --git a/client/configcheck.go b/client/configcheck.go index 77d3699..8ecd981 100644 --- a/client/configcheck.go +++ b/client/configcheck.go @@ -3,33 +3,35 @@ package client import ( "encoding/json" "fmt" + "os" + "github.com/kr/pretty" "github.com/pkg/errors" "github.com/spf13/pflag" "github.com/zrepl/yaml-config" + "github.com/zrepl/zrepl/cli" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/logger" - "os" ) var configcheckArgs struct { format string - what string + what string } var ConfigcheckCmd = &cli.Subcommand{ - Use: "configcheck", + Use: "configcheck", Short: "check if config can be parsed without errors", SetupFlags: func(f *pflag.FlagSet) { f.StringVar(&configcheckArgs.format, "format", "", "dump parsed config object [pretty|yaml|json]") f.StringVar(&configcheckArgs.what, "what", "all", "what to print [all|config|jobs|logging]") }, Run: func(subcommand *cli.Subcommand, args []string) error { - formatMap := map[string]func(interface{}) { - "": func(i interface{}) {}, + formatMap := map[string]func(interface{}){ + "": func(i interface{}) {}, "pretty": func(i interface{}) { pretty.Println(i) }, "json": func(i interface{}) { json.NewEncoder(os.Stdout).Encode(subcommand.Config()) @@ -71,12 +73,11 @@ var ConfigcheckCmd = &cli.Subcommand{ } } - - whatMap := map[string]func() { + whatMap := map[string]func(){ "all": func() { o := struct { - config *config.Config - jobs []job.Job + config *config.Config + jobs []job.Job logging *logger.Outlets }{ subcommand.Config(), @@ -109,4 +110,3 @@ var ConfigcheckCmd = &cli.Subcommand{ } }, } - diff --git a/client/jsonclient.go b/client/jsonclient.go index 41c272d..9f66aec 100644 --- a/client/jsonclient.go +++ b/client/jsonclient.go @@ -4,10 +4,11 @@ import ( "bytes" "context" "encoding/json" - "github.com/pkg/errors" "io" "net" "net/http" + + "github.com/pkg/errors" ) func controlHttpClient(sockpath string) (client http.Client, err error) { diff --git a/client/migrate.go b/client/migrate.go index 3b516bf..22a7270 100644 --- a/client/migrate.go +++ b/client/migrate.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" "github.com/spf13/pflag" + "github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/cli" diff --git a/client/pprof.go b/client/pprof.go index 29cd4e5..1702e84 100644 --- a/client/pprof.go +++ b/client/pprof.go @@ -2,11 +2,12 @@ package client import ( "errors" + "log" + "os" + "github.com/zrepl/zrepl/cli" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon" - "log" - "os" ) var pprofArgs struct { diff --git a/client/signal.go b/client/signal.go index 701849e..472c7ed 100644 --- a/client/signal.go +++ b/client/signal.go @@ -2,6 +2,7 @@ package client import ( "github.com/pkg/errors" + "github.com/zrepl/zrepl/cli" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon" @@ -28,10 +29,10 @@ func runSignalCmd(config *config.Config, args []string) error { err = jsonRequestResponse(httpc, daemon.ControlJobEndpointSignal, struct { Name string - Op string + Op string }{ Name: args[1], - Op: args[0], + Op: args[0], }, struct{}{}, ) diff --git a/client/status.go b/client/status.go index 9abc07a..0e7267a 100644 --- a/client/status.go +++ b/client/status.go @@ -2,15 +2,6 @@ package client import ( "fmt" - "github.com/gdamore/tcell/termbox" - "github.com/pkg/errors" - "github.com/spf13/pflag" - "github.com/zrepl/yaml-config" - "github.com/zrepl/zrepl/cli" - "github.com/zrepl/zrepl/daemon" - "github.com/zrepl/zrepl/daemon/job" - "github.com/zrepl/zrepl/daemon/pruner" - "github.com/zrepl/zrepl/replication/report" "io" "math" "net/http" @@ -19,18 +10,29 @@ import ( "strings" "sync" "time" + + "github.com/gdamore/tcell/termbox" + "github.com/pkg/errors" + "github.com/spf13/pflag" + "github.com/zrepl/yaml-config" + + "github.com/zrepl/zrepl/cli" + "github.com/zrepl/zrepl/daemon" + "github.com/zrepl/zrepl/daemon/job" + "github.com/zrepl/zrepl/daemon/pruner" + "github.com/zrepl/zrepl/replication/report" ) type byteProgressMeasurement struct { time time.Time - val int64 + val int64 } type bytesProgressHistory struct { - last *byteProgressMeasurement // pointer as poor man's optional + last *byteProgressMeasurement // pointer as poor man's optional changeCount int - lastChange time.Time - bpsAvg float64 + lastChange time.Time + bpsAvg float64 } func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64, changeCount int) { @@ -38,7 +40,7 @@ func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64 if p.last == nil { p.last = &byteProgressMeasurement{ time: time.Now(), - val: currentVal, + val: currentVal, } return 0, 0 } @@ -48,18 +50,17 @@ func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64 p.lastChange = time.Now() } - if time.Now().Sub(p.lastChange) > 3 * time.Second { + if time.Now().Sub(p.lastChange) > 3*time.Second { p.last = nil return 0, 0 } - - deltaV := currentVal - p.last.val; + deltaV := currentVal - p.last.val deltaT := time.Now().Sub(p.last.time) rate := float64(deltaV) / deltaT.Seconds() factor := 0.3 - p.bpsAvg = (1-factor) * p.bpsAvg + factor * rate + p.bpsAvg = (1-factor)*p.bpsAvg + factor*rate p.last.time = time.Now() p.last.val = currentVal @@ -119,7 +120,7 @@ func wrap(s string, width int) string { rem = len(s) } if idx := strings.IndexAny(s, "\n\r"); idx != -1 && idx < rem { - rem = idx+1 + rem = idx + 1 } untilNewline := strings.TrimRight(s[:rem], "\n\r") s = s[rem:] @@ -135,12 +136,12 @@ func wrap(s string, width int) string { func (t *tui) printfDrawIndentedAndWrappedIfMultiline(format string, a ...interface{}) { whole := fmt.Sprintf(format, a...) width, _ := termbox.Size() - if !strings.ContainsAny(whole, "\n\r") && t.x + len(whole) <= width { + if !strings.ContainsAny(whole, "\n\r") && t.x+len(whole) <= width { t.printf(format, a...) } else { t.addIndent(1) t.newline() - t.write(wrap(whole, width - INDENT_MULTIPLIER*t.indent)) + t.write(wrap(whole, width-INDENT_MULTIPLIER*t.indent)) t.addIndent(-1) } } @@ -159,7 +160,6 @@ func (t *tui) addIndent(indent int) { t.moveLine(0, 0) } - var statusFlags struct { Raw bool } @@ -180,7 +180,7 @@ func runStatus(s *cli.Subcommand, args []string) error { } if statusFlags.Raw { - resp, err := httpc.Get("http://unix"+daemon.ControlJobEndpointStatus) + resp, err := httpc.Get("http://unix" + daemon.ControlJobEndpointStatus) if err != nil { return err } @@ -390,7 +390,7 @@ func (t *tui) renderReplicationReport(rep *report.Report, history *bytesProgress t.newline() t.addIndent(1) for i, a := range rep.Attempts[:len(rep.Attempts)-1] { - t.printfDrawIndentedAndWrappedIfMultiline("#%d: %s (failed at %s) (ran %s)", i + 1, a.State, a.FinishAt, a.FinishAt.Sub(a.StartAt)) + t.printfDrawIndentedAndWrappedIfMultiline("#%d: %s (failed at %s) (ran %s)", i+1, a.State, a.FinishAt, a.FinishAt.Sub(a.StartAt)) t.newline() } t.addIndent(-1) @@ -462,7 +462,7 @@ func (t *tui) renderPrunerReport(r *pruner.Report) { *pruner.FSReport completed bool } - all := make([]commonFS, 0, len(r.Pending) + len(r.Completed)) + all := make([]commonFS, 0, len(r.Pending)+len(r.Completed)) for i := range r.Pending { all = append(all, commonFS{&r.Pending[i], false}) } @@ -471,7 +471,8 @@ func (t *tui) renderPrunerReport(r *pruner.Report) { } switch state { - case pruner.Plan: fallthrough + case pruner.Plan: + fallthrough case pruner.PlanErr: return } @@ -499,7 +500,7 @@ func (t *tui) renderPrunerReport(r *pruner.Report) { t.write("[") t.write(times("=", progress)) t.write(">") - t.write(times("-", 80 - progress)) + t.write(times("-", 80-progress)) t.write("]") t.printf(" %d/%d snapshots", completedDestroyCount, totalDestroyCount) t.newline() @@ -519,9 +520,9 @@ func (t *tui) renderPrunerReport(r *pruner.Report) { if fs.LastError != "" { if strings.ContainsAny(fs.LastError, "\r\n") { t.printf("ERROR:") - t.printfDrawIndentedAndWrappedIfMultiline("%s\n", fs.LastError) + t.printfDrawIndentedAndWrappedIfMultiline("%s\n", fs.LastError) } else { - t.printfDrawIndentedAndWrappedIfMultiline("ERROR: %s\n", fs.LastError) + t.printfDrawIndentedAndWrappedIfMultiline("ERROR: %s\n", fs.LastError) } t.newline() continue @@ -531,7 +532,7 @@ func (t *tui) renderPrunerReport(r *pruner.Report) { len(fs.DestroyList), len(fs.SnapshotList)) if fs.completed { - t.printf( "Completed %s\n", pruneRuleActionStr) + t.printf("Completed %s\n", pruneRuleActionStr) continue } @@ -560,7 +561,6 @@ func rightPad(str string, length int, pad string) string { return str + times(pad, length-len(str)) } - func leftPad(str string, length int, pad string) string { if len(str) > length { return str[len(str)-length:] @@ -584,7 +584,7 @@ func (t *tui) drawBar(length int, bytes, totalBytes int64, changeCount int) { t.write("[") t.write(times("=", completedLength)) - t.write( string(arrowPositions[changeCount%len(arrowPositions)])) + t.write(string(arrowPositions[changeCount%len(arrowPositions)])) t.write(times("-", length-completedLength)) t.write("]") } diff --git a/client/stdinserver.go b/client/stdinserver.go index 5db5520..80aa1b4 100644 --- a/client/stdinserver.go +++ b/client/stdinserver.go @@ -1,13 +1,15 @@ package client import ( - "github.com/zrepl/zrepl/cli" "os" + "github.com/problame/go-netssh" + + "github.com/zrepl/zrepl/cli" + "github.com/zrepl/zrepl/config" + "context" "errors" - "github.com/problame/go-netssh" - "github.com/zrepl/zrepl/config" "log" "path" ) diff --git a/client/testcmd.go b/client/testcmd.go index 3066af9..81136bb 100644 --- a/client/testcmd.go +++ b/client/testcmd.go @@ -2,15 +2,17 @@ package client import ( "fmt" + "github.com/pkg/errors" "github.com/spf13/pflag" + "github.com/zrepl/zrepl/cli" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/filters" "github.com/zrepl/zrepl/zfs" ) -var TestCmd = &cli.Subcommand { +var TestCmd = &cli.Subcommand{ Use: "test", SetupSubcommands: func() []*cli.Subcommand { return []*cli.Subcommand{testFilter, testPlaceholder} @@ -18,13 +20,13 @@ var TestCmd = &cli.Subcommand { } var testFilterArgs struct { - job string - all bool + job string + all bool input string } var testFilter = &cli.Subcommand{ - Use: "filesystems --job JOB [--all | --input INPUT]", + Use: "filesystems --job JOB [--all | --input INPUT]", Short: "test filesystems filter specified in push or source job", SetupFlags: func(f *pflag.FlagSet) { f.StringVar(&testFilterArgs.job, "job", "", "the name of the push or source job") @@ -51,8 +53,10 @@ func runTestFilterCmd(subcommand *cli.Subcommand, args []string) error { return err } switch j := job.Ret.(type) { - case *config.SourceJob: confFilter = j.Filesystems - case *config.PushJob: confFilter = j.Filesystems + case *config.SourceJob: + confFilter = j.Filesystems + case *config.PushJob: + confFilter = j.Filesystems default: return fmt.Errorf("job type %T does not have filesystems filter", j) } diff --git a/client/version.go b/client/version.go index 9dcec05..a0dad26 100644 --- a/client/version.go +++ b/client/version.go @@ -2,23 +2,25 @@ package client import ( "fmt" + "os" + "github.com/spf13/pflag" + "github.com/zrepl/zrepl/cli" "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon" "github.com/zrepl/zrepl/version" - "os" ) var versionArgs struct { - Show string - Config *config.Config + Show string + Config *config.Config ConfigErr error } var VersionCmd = &cli.Subcommand{ - Use: "version", - Short: "print version of zrepl binary and running daemon", + Use: "version", + Short: "print version of zrepl binary and running daemon", NoRequireConfig: true, SetupFlags: func(f *pflag.FlagSet) { f.StringVar(&versionArgs.Show, "show", "", "version info to show (client|daemon)") diff --git a/config/config.go b/config/config.go index 64a8179..2cff88e 100644 --- a/config/config.go +++ b/config/config.go @@ -2,8 +2,6 @@ package config import ( "fmt" - "github.com/pkg/errors" - "github.com/zrepl/yaml-config" "io/ioutil" "log/syslog" "os" @@ -11,6 +9,9 @@ import ( "regexp" "strconv" "time" + + "github.com/pkg/errors" + "github.com/zrepl/yaml-config" ) type Config struct { @@ -34,11 +35,16 @@ type JobEnum struct { func (j JobEnum) Name() string { var name string switch v := j.Ret.(type) { - case *SnapJob: name = v.Name - case *PushJob: name = v.Name - case *SinkJob: name = v.Name - case *PullJob: name = v.Name - case *SourceJob: name = v.Name + case *SnapJob: + name = v.Name + case *PushJob: + name = v.Name + case *SinkJob: + name = v.Name + case *PullJob: + name = v.Name + case *SourceJob: + name = v.Name default: panic(fmt.Sprintf("unknown job type %T", v)) } @@ -46,38 +52,38 @@ func (j JobEnum) Name() string { } type ActiveJob struct { - Type string `yaml:"type"` - Name string `yaml:"name"` - Connect ConnectEnum `yaml:"connect"` - Pruning PruningSenderReceiver `yaml:"pruning"` - Debug JobDebugSettings `yaml:"debug,optional"` + Type string `yaml:"type"` + Name string `yaml:"name"` + Connect ConnectEnum `yaml:"connect"` + Pruning PruningSenderReceiver `yaml:"pruning"` + Debug JobDebugSettings `yaml:"debug,optional"` } type PassiveJob struct { - Type string `yaml:"type"` - Name string `yaml:"name"` - Serve ServeEnum `yaml:"serve"` - Debug JobDebugSettings `yaml:"debug,optional"` + Type string `yaml:"type"` + Name string `yaml:"name"` + Serve ServeEnum `yaml:"serve"` + Debug JobDebugSettings `yaml:"debug,optional"` } type SnapJob struct { - Type string `yaml:"type"` - Name string `yaml:"name"` - Pruning PruningLocal `yaml:"pruning"` - Debug JobDebugSettings `yaml:"debug,optional"` - Snapshotting SnapshottingEnum `yaml:"snapshotting"` - Filesystems FilesystemsFilter `yaml:"filesystems"` + Type string `yaml:"type"` + Name string `yaml:"name"` + Pruning PruningLocal `yaml:"pruning"` + Debug JobDebugSettings `yaml:"debug,optional"` + Snapshotting SnapshottingEnum `yaml:"snapshotting"` + Filesystems FilesystemsFilter `yaml:"filesystems"` } type PushJob struct { - ActiveJob `yaml:",inline"` - Snapshotting SnapshottingEnum `yaml:"snapshotting"` - Filesystems FilesystemsFilter `yaml:"filesystems"` + ActiveJob `yaml:",inline"` + Snapshotting SnapshottingEnum `yaml:"snapshotting"` + Filesystems FilesystemsFilter `yaml:"filesystems"` } type PullJob struct { ActiveJob `yaml:",inline"` - RootFS string `yaml:"root_fs"` + RootFS string `yaml:"root_fs"` Interval PositiveDurationOrManual `yaml:"interval"` } @@ -118,9 +124,9 @@ type SinkJob struct { } type SourceJob struct { - PassiveJob `yaml:",inline"` - Snapshotting SnapshottingEnum `yaml:"snapshotting"` - Filesystems FilesystemsFilter `yaml:"filesystems"` + PassiveJob `yaml:",inline"` + Snapshotting SnapshottingEnum `yaml:"snapshotting"` + Filesystems FilesystemsFilter `yaml:"filesystems"` } type FilesystemsFilter map[string]bool @@ -130,8 +136,8 @@ type SnapshottingEnum struct { } type SnapshottingPeriodic struct { - Type string `yaml:"type"` - Prefix string `yaml:"prefix"` + Type string `yaml:"type"` + Prefix string `yaml:"prefix"` Interval time.Duration `yaml:"interval,positive"` } @@ -191,7 +197,7 @@ type ConnectEnum struct { } type ConnectCommon struct { - Type string `yaml:"type"` + Type string `yaml:"type"` } type TCPConnect struct { @@ -223,8 +229,8 @@ type SSHStdinserverConnect struct { } type LocalConnect struct { - ConnectCommon `yaml:",inline"` - ListenerName string `yaml:"listener_name"` + ConnectCommon `yaml:",inline"` + ListenerName string `yaml:"listener_name"` ClientIdentity string `yaml:"client_identity"` } @@ -233,7 +239,7 @@ type ServeEnum struct { } type ServeCommon struct { - Type string `yaml:"type"` + Type string `yaml:"type"` } type TCPServe struct { @@ -253,12 +259,12 @@ type TLSServe struct { } type StdinserverServer struct { - ServeCommon `yaml:",inline"` + ServeCommon `yaml:",inline"` ClientIdentities []string `yaml:"client_identities"` } type LocalServe struct { - ServeCommon `yaml:",inline"` + ServeCommon `yaml:",inline"` ListenerName string `yaml:"listener_name"` } @@ -267,8 +273,8 @@ type PruningEnum struct { } type PruneKeepNotReplicated struct { - Type string `yaml:"type"` - KeepSnapshotAtCursor bool `yaml:"keep_snapshot_at_cursor,optional,default=true"` + Type string `yaml:"type"` + KeepSnapshotAtCursor bool `yaml:"keep_snapshot_at_cursor,optional,default=true"` } type PruneKeepLastN struct { @@ -277,8 +283,8 @@ type PruneKeepLastN struct { } type PruneKeepRegex struct { // FIXME rename to KeepRegex - Type string `yaml:"type"` - Regex string `yaml:"regex"` + Type string `yaml:"type"` + Regex string `yaml:"regex"` Negate bool `yaml:"negate,optional,default=false"` } @@ -301,7 +307,7 @@ type StdoutLoggingOutlet struct { type SyslogLoggingOutlet struct { LoggingOutletCommon `yaml:",inline"` Facility *SyslogFacility `yaml:"facility,optional,fromdefaults"` - RetryInterval time.Duration `yaml:"retry_interval,positive,default=10s"` + RetryInterval time.Duration `yaml:"retry_interval,positive,default=10s"` } type TCPLoggingOutlet struct { @@ -392,7 +398,7 @@ func (t *ConnectEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) "tcp": &TCPConnect{}, "tls": &TLSConnect{}, "ssh+stdinserver": &SSHStdinserverConnect{}, - "local": &LocalConnect{}, + "local": &LocalConnect{}, }) return } @@ -402,7 +408,7 @@ func (t *ServeEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) { "tcp": &TCPServe{}, "tls": &TLSServe{}, "stdinserver": &StdinserverServer{}, - "local" : &LocalServe{}, + "local": &LocalServe{}, }) return } @@ -420,7 +426,7 @@ func (t *PruningEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) func (t *SnapshottingEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) { t.Ret, err = enumUnmarshal(u, map[string]interface{}{ "periodic": &SnapshottingPeriodic{}, - "manual": &SnapshottingManual{}, + "manual": &SnapshottingManual{}, }) return } @@ -448,31 +454,51 @@ func (t *SyslogFacility) UnmarshalYAML(u func(interface{}, bool) error) (err err } var level syslog.Priority switch s { - case "kern": level = syslog.LOG_KERN - case "user": level = syslog.LOG_USER - case "mail": level = syslog.LOG_MAIL - case "daemon": level = syslog.LOG_DAEMON - case "auth": level = syslog.LOG_AUTH - case "syslog": level = syslog.LOG_SYSLOG - case "lpr": level = syslog.LOG_LPR - case "news": level = syslog.LOG_NEWS - case "uucp": level = syslog.LOG_UUCP - case "cron": level = syslog.LOG_CRON - case "authpriv": level = syslog.LOG_AUTHPRIV - case "ftp": level = syslog.LOG_FTP - case "local0": level = syslog.LOG_LOCAL0 - case "local1": level = syslog.LOG_LOCAL1 - case "local2": level = syslog.LOG_LOCAL2 - case "local3": level = syslog.LOG_LOCAL3 - case "local4": level = syslog.LOG_LOCAL4 - case "local5": level = syslog.LOG_LOCAL5 - case "local6": level = syslog.LOG_LOCAL6 - case "local7": level = syslog.LOG_LOCAL7 + case "kern": + level = syslog.LOG_KERN + case "user": + level = syslog.LOG_USER + case "mail": + level = syslog.LOG_MAIL + case "daemon": + level = syslog.LOG_DAEMON + case "auth": + level = syslog.LOG_AUTH + case "syslog": + level = syslog.LOG_SYSLOG + case "lpr": + level = syslog.LOG_LPR + case "news": + level = syslog.LOG_NEWS + case "uucp": + level = syslog.LOG_UUCP + case "cron": + level = syslog.LOG_CRON + case "authpriv": + level = syslog.LOG_AUTHPRIV + case "ftp": + level = syslog.LOG_FTP + case "local0": + level = syslog.LOG_LOCAL0 + case "local1": + level = syslog.LOG_LOCAL1 + case "local2": + level = syslog.LOG_LOCAL2 + case "local3": + level = syslog.LOG_LOCAL3 + case "local4": + level = syslog.LOG_LOCAL4 + case "local5": + level = syslog.LOG_LOCAL5 + case "local6": + level = syslog.LOG_LOCAL6 + case "local7": + level = syslog.LOG_LOCAL7 default: return fmt.Errorf("invalid syslog level: %q", s) } *t = SyslogFacility(level) - return nil + return nil } var ConfigFileDefaultLocations = []string{ diff --git a/config/config_global_test.go b/config/config_global_test.go index 51204b0..f782e9f 100644 --- a/config/config_global_test.go +++ b/config/config_global_test.go @@ -2,11 +2,12 @@ package config import ( "fmt" + "log/syslog" + "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/zrepl/yaml-config" - "log/syslog" - "testing" ) func testValidGlobalSection(t *testing.T, s string) *Config { @@ -24,7 +25,7 @@ jobs: ` _, err := ParseConfigBytes([]byte(jobdef)) require.NoError(t, err) - return testValidConfig(t, s + jobdef) + return testValidConfig(t, s+jobdef) } func TestOutletTypes(t *testing.T) { @@ -71,7 +72,7 @@ global: - type: prometheus listen: ':9091' `) - assert.Equal(t, ":9091", conf.Global.Monitoring[0].Ret.(*PrometheusMonitoring).Listen) + assert.Equal(t, ":9091", conf.Global.Monitoring[0].Ret.(*PrometheusMonitoring).Listen) } func TestSyslogLoggingOutletFacility(t *testing.T) { diff --git a/config/config_minimal_test.go b/config/config_minimal_test.go index 72f8752..74747bf 100644 --- a/config/config_minimal_test.go +++ b/config/config_minimal_test.go @@ -2,6 +2,7 @@ package config import ( "testing" + "github.com/stretchr/testify/assert" ) @@ -36,4 +37,4 @@ jobs: - type: last_n count: 1 `) -} \ No newline at end of file +} diff --git a/config/config_snapshotting_test.go b/config/config_snapshotting_test.go index e0f826b..1bfb7e2 100644 --- a/config/config_snapshotting_test.go +++ b/config/config_snapshotting_test.go @@ -2,9 +2,10 @@ package config import ( "fmt" - "github.com/stretchr/testify/assert" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestSnapshotting(t *testing.T) { @@ -37,7 +38,7 @@ jobs: interval: 10m ` - fillSnapshotting := func(s string) string {return fmt.Sprintf(tmpl, s)} + fillSnapshotting := func(s string) string { return fmt.Sprintf(tmpl, s) } var c *Config t.Run("manual", func(t *testing.T) { @@ -51,7 +52,7 @@ jobs: snp := c.Jobs[0].Ret.(*PushJob).Snapshotting.Ret.(*SnapshottingPeriodic) assert.Equal(t, "periodic", snp.Type) assert.Equal(t, 10*time.Minute, snp.Interval) - assert.Equal(t, "zrepl_" , snp.Prefix) + assert.Equal(t, "zrepl_", snp.Prefix) }) } diff --git a/config/retentiongrid.go b/config/retentiongrid.go index 58b2ff3..5a91bca 100644 --- a/config/retentiongrid.go +++ b/config/retentiongrid.go @@ -11,9 +11,9 @@ import ( type RetentionIntervalList []RetentionInterval type PruneGrid struct { - Type string `yaml:"type"` - Grid RetentionIntervalList `yaml:"grid"` - Regex string `yaml:"regex"` + Type string `yaml:"type"` + Grid RetentionIntervalList `yaml:"grid"` + Regex string `yaml:"regex"` } type RetentionInterval struct { diff --git a/daemon/control.go b/daemon/control.go index 6e60061..a19bd5a 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/daemon/nethelpers" "github.com/zrepl/zrepl/logger" @@ -43,24 +44,24 @@ func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInt func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false } var promControl struct { - requestBegin *prometheus.CounterVec + requestBegin *prometheus.CounterVec requestFinished *prometheus.HistogramVec } func (j *controlJob) RegisterMetrics(registerer prometheus.Registerer) { promControl.requestBegin = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "zrepl", - Subsystem: "control", - Name: "request_begin", - Help: "number of request we started to handle", + Namespace: "zrepl", + Subsystem: "control", + Name: "request_begin", + Help: "number of request we started to handle", }, []string{"endpoint"}) promControl.requestFinished = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "zrepl", - Subsystem: "control", - Name: "request_finished", - Help: "time it took a request to finih", - Buckets: []float64{1e-6, 10e-6, 100e-6, 500e-6, 1e-3,10e-3, 100e-3, 200e-3,400e-3,800e-3, 1, 10, 20}, + Namespace: "zrepl", + Subsystem: "control", + Name: "request_finished", + Help: "time it took a request to finih", + Buckets: []float64{1e-6, 10e-6, 100e-6, 500e-6, 1e-3, 10e-3, 100e-3, 200e-3, 400e-3, 800e-3, 1, 10, 20}, }, []string{"endpoint"}) registerer.MustRegister(promControl.requestBegin) registerer.MustRegister(promControl.requestFinished) @@ -114,7 +115,7 @@ func (j *controlJob) Run(ctx context.Context) { requestLogger{log: log, handler: jsonRequestResponder{func(decoder jsonDecoder) (interface{}, error) { type reqT struct { Name string - Op string + Op string } var req reqT if decoder(&req) != nil { @@ -136,8 +137,8 @@ func (j *controlJob) Run(ctx context.Context) { server := http.Server{ Handler: mux, // control socket is local, 1s timeout should be more than sufficient, even on a loaded system - WriteTimeout: 1*time.Second, - ReadTimeout: 1*time.Second, + WriteTimeout: 1 * time.Second, + ReadTimeout: 1 * time.Second, } outer: diff --git a/daemon/daemon.go b/daemon/daemon.go index 9f0e185..b4236d3 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -3,8 +3,16 @@ package daemon import ( "context" "fmt" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/daemon/job/reset" @@ -12,12 +20,6 @@ import ( "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/version" - "os" - "os/signal" - "strings" - "sync" - "syscall" - "time" ) func Run(conf *config.Config) error { @@ -74,12 +76,11 @@ func Run(conf *config.Config) error { return errors.Errorf("unknown monitoring job #%d (type %T)", i, v) } if err != nil { - return errors.Wrapf(err,"cannot build monitorin gjob #%d", i) + return errors.Wrapf(err, "cannot build monitorin gjob #%d", i) } jobs.start(ctx, job, true) } - log.Info("starting daemon") // start regular jobs @@ -103,7 +104,7 @@ type jobs struct { // m protects all fields below it m sync.RWMutex wakeups map[string]wakeup.Func // by Job.Name - resets map[string]reset.Func // by Job.Name + resets map[string]reset.Func // by Job.Name jobs map[string]job.Job } diff --git a/daemon/filters/fsmapfilter.go b/daemon/filters/fsmapfilter.go index b6a785d..61178c3 100644 --- a/daemon/filters/fsmapfilter.go +++ b/daemon/filters/fsmapfilter.go @@ -2,10 +2,12 @@ package filters import ( "fmt" + "strings" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/zfs" - "strings" ) type DatasetMapFilter struct { diff --git a/daemon/filters/fsvfilter.go b/daemon/filters/fsvfilter.go index 3abcca8..812d258 100644 --- a/daemon/filters/fsvfilter.go +++ b/daemon/filters/fsvfilter.go @@ -1,8 +1,9 @@ package filters import ( - "github.com/zrepl/zrepl/zfs" "strings" + + "github.com/zrepl/zrepl/zfs" ) type AnyFSVFilter struct{} @@ -17,7 +18,6 @@ func (AnyFSVFilter) Filter(t zfs.VersionType, name string) (accept bool, err err return true, nil } - type PrefixFilter struct { prefix string fstype zfs.VersionType diff --git a/daemon/job/build_jobs.go b/daemon/job/build_jobs.go index 2e4a271..9fe5f66 100644 --- a/daemon/job/build_jobs.go +++ b/daemon/job/build_jobs.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" ) diff --git a/daemon/job/job.go b/daemon/job/job.go index a472bd8..ec844a5 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/zfs" ) @@ -29,7 +30,6 @@ func WithLogger(ctx context.Context, l Logger) context.Context { return context.WithValue(ctx, contextKeyLog, l) } - type Job interface { Name() string Run(ctx context.Context) @@ -44,15 +44,15 @@ type Type string const ( TypeInternal Type = "internal" - TypeSnap Type = "snap" - TypePush Type = "push" - TypeSink Type = "sink" - TypePull Type = "pull" - TypeSource Type = "source" + TypeSnap Type = "snap" + TypePush Type = "push" + TypeSink Type = "sink" + TypePull Type = "pull" + TypeSource Type = "source" ) type Status struct { - Type Type + Type Type JobSpecific interface{} } @@ -65,8 +65,8 @@ func (s *Status) MarshalJSON() ([]byte, error) { if err != nil { return nil, err } - m := map[string]json.RawMessage { - "type": typeJson, + m := map[string]json.RawMessage{ + "type": typeJson, string(s.Type): jobJSON, } return json.Marshal(m) @@ -94,12 +94,14 @@ func (s *Status) UnmarshalJSON(in []byte) (err error) { var st SnapJobStatus err = json.Unmarshal(jobJSON, &st) s.JobSpecific = &st - case TypePull: fallthrough + case TypePull: + fallthrough case TypePush: var st ActiveSideStatus err = json.Unmarshal(jobJSON, &st) s.JobSpecific = &st - case TypeSource: fallthrough + case TypeSource: + fallthrough case TypeSink: var st PassiveStatus err = json.Unmarshal(jobJSON, &st) diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index 7eb6bef..7614336 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/filters" "github.com/zrepl/zrepl/daemon/job/wakeup" diff --git a/daemon/logging/logging_formatters.go b/daemon/logging/logging_formatters.go index 3f3a7a8..df09eac 100644 --- a/daemon/logging/logging_formatters.go +++ b/daemon/logging/logging_formatters.go @@ -4,11 +4,13 @@ import ( "bytes" "encoding/json" "fmt" + "time" + "github.com/fatih/color" "github.com/go-logfmt/logfmt" "github.com/pkg/errors" + "github.com/zrepl/zrepl/logger" - "time" ) const ( diff --git a/daemon/logging/logging_outlets.go b/daemon/logging/logging_outlets.go index b03d008..c2b8a01 100644 --- a/daemon/logging/logging_outlets.go +++ b/daemon/logging/logging_outlets.go @@ -4,12 +4,14 @@ import ( "bytes" "context" "crypto/tls" - "github.com/pkg/errors" - "github.com/zrepl/zrepl/logger" "io" "log/syslog" "net" "time" + + "github.com/pkg/errors" + + "github.com/zrepl/zrepl/logger" ) type EntryFormatter interface { diff --git a/daemon/main.go b/daemon/main.go index fae8c12..4bad05b 100644 --- a/daemon/main.go +++ b/daemon/main.go @@ -7,7 +7,7 @@ import ( type Logger = logger.Logger -var DaemonCmd = &cli.Subcommand { +var DaemonCmd = &cli.Subcommand{ Use: "daemon", Short: "run the zrepl daemon", Run: func(subcommand *cli.Subcommand, args []string) error { diff --git a/daemon/nethelpers/helpers.go b/daemon/nethelpers/helpers.go index 994b9d2..5c1362c 100644 --- a/daemon/nethelpers/helpers.go +++ b/daemon/nethelpers/helpers.go @@ -1,10 +1,11 @@ package nethelpers import ( - "github.com/pkg/errors" "net" "os" "path/filepath" + + "github.com/pkg/errors" ) func PreparePrivateSockpath(sockpath string) error { diff --git a/daemon/prometheus.go b/daemon/prometheus.go index 9db3554..71601ff 100644 --- a/daemon/prometheus.go +++ b/daemon/prometheus.go @@ -2,15 +2,17 @@ package daemon import ( "context" + "net" + "net/http" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/rpc/dataconn/frameconn" "github.com/zrepl/zrepl/zfs" - "net" - "net/http" ) type prometheusJob struct { @@ -25,7 +27,7 @@ func newPrometheusJobFromConfig(in *config.PrometheusMonitoring) (*prometheusJob } var prom struct { - taskLogEntries *prometheus.CounterVec + taskLogEntries *prometheus.CounterVec } func init() { @@ -93,4 +95,3 @@ func (o prometheusJobOutlet) WriteEntry(entry logger.Entry) error { prom.taskLogEntries.WithLabelValues(o.jobName, entry.Level.String()).Inc() return nil } - diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go index d81c8a4..a9983bb 100644 --- a/daemon/pruner/pruner.go +++ b/daemon/pruner/pruner.go @@ -3,17 +3,19 @@ package pruner import ( "context" "fmt" + "sort" + "strings" + "sync" + "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/pruning" "github.com/zrepl/zrepl/replication/logic/pdu" "github.com/zrepl/zrepl/util/envconst" - "sort" - "strings" - "sync" - "time" ) // Try to keep it compatible with gitub.com/zrepl/zrepl/endpoint.Endpoint @@ -53,7 +55,7 @@ type args struct { rules []pruning.KeepRule retryWait time.Duration considerSnapAtCursorReplicated bool - promPruneSecs prometheus.Observer + promPruneSecs prometheus.Observer } type Pruner struct { @@ -64,7 +66,7 @@ type Pruner struct { state State // State PlanErr - err error + err error // State Exec execQueue *execQueue @@ -75,7 +77,7 @@ type PrunerFactory struct { receiverRules []pruning.KeepRule retryWait time.Duration considerSnapAtCursorReplicated bool - promPruneSecs *prometheus.HistogramVec + promPruneSecs *prometheus.HistogramVec } type LocalPrunerFactory struct { @@ -137,11 +139,11 @@ func NewPrunerFactory(in config.PruningSenderReceiver, promPruneSecs *prometheus considerSnapAtCursorReplicated = considerSnapAtCursorReplicated || !knr.KeepSnapshotAtCursor } f := &PrunerFactory{ - senderRules: keepRulesSender, - receiverRules: keepRulesReceiver, - retryWait: envconst.Duration("ZREPL_PRUNER_RETRY_INTERVAL", 10 * time.Second), + senderRules: keepRulesSender, + receiverRules: keepRulesReceiver, + retryWait: envconst.Duration("ZREPL_PRUNER_RETRY_INTERVAL", 10*time.Second), considerSnapAtCursorReplicated: considerSnapAtCursorReplicated, - promPruneSecs: promPruneSecs, + promPruneSecs: promPruneSecs, } return f, nil } @@ -213,17 +215,17 @@ func (p *Pruner) Prune() { func (p *Pruner) prune(args args) { u := func(f func(*Pruner)) { - p.mtx.Lock() - defer p.mtx.Unlock() - f(p) - } + p.mtx.Lock() + defer p.mtx.Unlock() + f(p) + } // TODO support automatic retries // It is advisable to merge this code with package replication/driver before // That will likely require re-modelling struct fs like replication/driver.attempt, // including figuring out how to resume a plan after being interrupted by network errors // The non-retrying code in this package should move straight to replication/logic. doOneAttempt(&args, u) - } +} type Report struct { State string @@ -239,9 +241,9 @@ type FSReport struct { } type SnapshotReport struct { - Name string + Name string Replicated bool - Date time.Time + Date time.Time } func (p *Pruner) Report() *Report { @@ -250,9 +252,9 @@ func (p *Pruner) Report() *Report { r := Report{State: p.state.String()} - if p.err != nil { - r.Error = p.err.Error() - } + if p.err != nil { + r.Error = p.err.Error() + } if p.execQueue != nil { r.Pending, r.Completed = p.execQueue.Report() @@ -268,7 +270,7 @@ func (p *Pruner) State() State { } type fs struct { - path string + path string // permanent error during planning planErr error @@ -316,7 +318,7 @@ func (f *fs) Report() FSReport { if f.planErr != nil { r.LastError = f.planErr.Error() - } else if f.execErrLast != nil { + } else if f.execErrLast != nil { r.LastError = f.execErrLast.Error() } @@ -326,7 +328,7 @@ func (f *fs) Report() FSReport { } r.DestroyList = make([]SnapshotReport, len(f.destroyList)) - for i, snap := range f.destroyList{ + for i, snap := range f.destroyList { r.DestroyList[i] = snap.(snapshot).Report() } @@ -490,9 +492,9 @@ tfss_loop: }) for { - var pfs *fs + var pfs *fs u(func(pruner *Pruner) { - pfs = pruner.execQueue.Pop() + pfs = pruner.execQueue.Pop() }) if pfs == nil { break @@ -516,16 +518,15 @@ tfss_loop: hadErr := false for _, fsr := range rep.Completed { hadErr = hadErr || fsr.SkipReason.NotSkipped() && fsr.LastError != "" - } + } if hadErr { p.state = ExecErr } else { p.state = Done } }) - - } +} // attempts to exec pfs, puts it back into the queue with the result func doOneAttemptExec(a *args, u updater, pfs *fs) { @@ -558,20 +559,20 @@ func doOneAttemptExec(a *args, u updater, pfs *fs) { err = nil destroyFails := make([]*pdu.DestroySnapshotRes, 0) for _, reqDestroy := range destroyList { - res, ok := destroyResults[reqDestroy.Name] - if !ok { - err = fmt.Errorf("missing destroy-result for %s", reqDestroy.RelName()) - break - } else if res.Error != "" { - destroyFails = append(destroyFails, res) - } + res, ok := destroyResults[reqDestroy.Name] + if !ok { + err = fmt.Errorf("missing destroy-result for %s", reqDestroy.RelName()) + break + } else if res.Error != "" { + destroyFails = append(destroyFails, res) + } } if err == nil && len(destroyFails) > 0 { names := make([]string, len(destroyFails)) pairs := make([]string, len(destroyFails)) allSame := true lastMsg := destroyFails[0].Error - for i := 0; i < len(destroyFails); i++{ + for i := 0; i < len(destroyFails); i++ { allSame = allSame && destroyFails[i].Error == lastMsg relname := destroyFails[i].Snapshot.RelName() names[i] = relname diff --git a/daemon/pruner/pruner_queue.go b/daemon/pruner/pruner_queue.go index 840e93b..a824344 100644 --- a/daemon/pruner/pruner_queue.go +++ b/daemon/pruner/pruner_queue.go @@ -7,13 +7,13 @@ import ( ) type execQueue struct { - mtx sync.Mutex + mtx sync.Mutex pending, completed []*fs } func newExecQueue(cap int) *execQueue { q := execQueue{ - pending: make([]*fs, 0, cap), + pending: make([]*fs, 0, cap), completed: make([]*fs, 0, cap), } return &q @@ -55,7 +55,7 @@ func (q *execQueue) Pop() *fs { return fs } -func(q *execQueue) Put(fs *fs, err error, done bool) { +func (q *execQueue) Put(fs *fs, err error, done bool) { fs.mtx.Lock() fs.execErrLast = err if done || err != nil { @@ -79,5 +79,4 @@ func(q *execQueue) Put(fs *fs, err error, done bool) { }) q.mtx.Unlock() - -} \ No newline at end of file +} diff --git a/daemon/snapper/snapper.go b/daemon/snapper/snapper.go index 31e7b3c..8ba6ce8 100644 --- a/daemon/snapper/snapper.go +++ b/daemon/snapper/snapper.go @@ -1,18 +1,19 @@ package snapper import ( - "github.com/zrepl/zrepl/config" - "github.com/pkg/errors" - "time" "context" - "github.com/zrepl/zrepl/daemon/filters" "fmt" - "github.com/zrepl/zrepl/zfs" "sort" - "github.com/zrepl/zrepl/logger" "sync" -) + "time" + "github.com/pkg/errors" + + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/zfs" +) //go:generate stringer -type=SnapState type SnapState uint @@ -28,7 +29,7 @@ type snapProgress struct { state SnapState // SnapStarted, SnapDone, SnapError - name string + name string startAt time.Time // SnapDone @@ -44,13 +45,13 @@ type args struct { prefix string interval time.Duration fsf *filters.DatasetMapFilter - snapshotsTaken chan<-struct{} + snapshotsTaken chan<- struct{} } type Snapper struct { args args - mtx sync.Mutex + mtx sync.Mutex state State // set in state Plan, used in Waiting @@ -70,7 +71,7 @@ type Snapper struct { type State uint const ( - SyncUp State = 1< LabelLen { return fmt.Errorf("label %q exceeds max length (is %d, max %d)", label, len(label), LabelLen) } @@ -153,7 +153,7 @@ func Demux(ctx context.Context, rawListener transport.AuthenticatedListener, lab type labeledConnecter struct { label []byte - transport.Connecter + transport.Connecter } func (c labeledConnecter) Connect(ctx context.Context) (transport.Wire, error) { @@ -202,4 +202,3 @@ func MuxConnecter(rawConnecter transport.Connecter, labels []string, timeout tim } return ret, nil } - diff --git a/rpc/versionhandshake/versionhandshake.go b/rpc/versionhandshake/versionhandshake.go index 03835ee..952acec 100644 --- a/rpc/versionhandshake/versionhandshake.go +++ b/rpc/versionhandshake/versionhandshake.go @@ -17,7 +17,7 @@ import ( type HandshakeMessage struct { ProtocolVersion int - Extensions []string + Extensions []string } // A HandshakeError describes what went wrong during the handshake. @@ -25,7 +25,7 @@ type HandshakeMessage struct { type HandshakeError struct { msg string // If not nil, the underlying IO error that caused the handshake to fail. - IOError error + IOError error isAcceptError bool } @@ -36,10 +36,10 @@ func (e HandshakeError) Error() string { return e.msg } // Like with net.OpErr (Go issue 6163), a client failing to handshake // should be a temporary Accept error toward the Listener . func (e HandshakeError) Temporary() bool { - if e.isAcceptError { + if e.isAcceptError { return true } - te, ok := e.IOError.(interface{ Temporary() bool }); + te, ok := e.IOError.(interface{ Temporary() bool }) return ok && te.Temporary() } @@ -52,11 +52,11 @@ func (e HandshakeError) Timeout() bool { return false } -func hsErr(format string, args... interface{}) *HandshakeError { +func hsErr(format string, args ...interface{}) *HandshakeError { return &HandshakeError{msg: fmt.Sprintf(format, args...)} } -func hsIOErr(err error, format string, args... interface{}) *HandshakeError { +func hsIOErr(err error, format string, args ...interface{}) *HandshakeError { return &HandshakeError{IOError: err, msg: fmt.Sprintf(format, args...)} } @@ -145,7 +145,7 @@ func (m *HandshakeMessage) DecodeReader(r io.Reader, maxLen int) error { if exts[len(exts)-1] != "" { return hsErr("unexpected data trailing after last extension newline") } - m.Extensions = exts[0:len(exts)-1] + m.Extensions = exts[0 : len(exts)-1] return nil } @@ -160,7 +160,7 @@ const HandshakeMessageMaxLen = 16 * 4096 func DoHandshakeVersion(conn net.Conn, deadline time.Time, version int) *HandshakeError { ours := HandshakeMessage{ ProtocolVersion: version, - Extensions: nil, + Extensions: nil, } hsb, err := ours.Encode() if err != nil { diff --git a/rpc/versionhandshake/versionhandshake_test.go b/rpc/versionhandshake/versionhandshake_test.go index dd27c9d..b45b17f 100644 --- a/rpc/versionhandshake/versionhandshake_test.go +++ b/rpc/versionhandshake/versionhandshake_test.go @@ -3,13 +3,15 @@ package versionhandshake import ( "bytes" "fmt" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/zrepl/zrepl/util/socketpair" "io" "strings" "testing" "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zrepl/zrepl/util/socketpair" ) func TestHandshakeMessage_Encode(t *testing.T) { @@ -23,8 +25,6 @@ func TestHandshakeMessage_Encode(t *testing.T) { enc := string(encB) t.Logf("enc: %s", enc) - - assert.False(t, strings.ContainsAny(enc[0:10], " ")) assert.True(t, enc[10] == ' ') @@ -45,7 +45,7 @@ func TestHandshakeMessage_Encode(t *testing.T) { func TestHandshakeMessage_Encode_InvalidProtocolVersion(t *testing.T) { - for _, pv := range []int{-1, 0, 10000, 10001} { + for _, pv := range []int{-1, 0, 10000, 10001} { t.Logf("testing invalid protocol version = %v", pv) msg := HandshakeMessage{ ProtocolVersion: pv, @@ -68,7 +68,7 @@ func TestHandshakeMessage_DecodeReader(t *testing.T) { require.NoError(t, err) out := HandshakeMessage{} - err = out.DecodeReader(bytes.NewReader([]byte(enc)), 4 * 4096) + err = out.DecodeReader(bytes.NewReader([]byte(enc)), 4*4096) assert.NoError(t, err) assert.Equal(t, 2342, out.ProtocolVersion) assert.Equal(t, 2, len(out.Extensions)) diff --git a/rpc/versionhandshake/versionhandshake_transport_wrappers.go b/rpc/versionhandshake/versionhandshake_transport_wrappers.go index 09ead7a..bc176dd 100644 --- a/rpc/versionhandshake/versionhandshake_transport_wrappers.go +++ b/rpc/versionhandshake/versionhandshake_transport_wrappers.go @@ -4,12 +4,13 @@ import ( "context" "net" "time" + "github.com/zrepl/zrepl/transport" ) type HandshakeConnecter struct { connecter transport.Connecter - timeout time.Duration + timeout time.Duration } func (c HandshakeConnecter) Connect(ctx context.Context) (transport.Wire, error) { @@ -31,17 +32,17 @@ func (c HandshakeConnecter) Connect(ctx context.Context) (transport.Wire, error) func Connecter(connecter transport.Connecter, timeout time.Duration) HandshakeConnecter { return HandshakeConnecter{ connecter: connecter, - timeout: timeout, + timeout: timeout, } } // wrapper type that performs a a protocol version handshake before returning the connection type HandshakeListener struct { - l transport.AuthenticatedListener + l transport.AuthenticatedListener timeout time.Duration } -func (l HandshakeListener) Addr() (net.Addr) { return l.l.Addr() } +func (l HandshakeListener) Addr() net.Addr { return l.l.Addr() } func (l HandshakeListener) Close() error { return l.l.Close() } diff --git a/transport/fromconfig/transport_fromconfig.go b/transport/fromconfig/transport_fromconfig.go index 0aa1426..def2450 100644 --- a/transport/fromconfig/transport_fromconfig.go +++ b/transport/fromconfig/transport_fromconfig.go @@ -4,7 +4,9 @@ package fromconfig import ( "fmt" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/transport" "github.com/zrepl/zrepl/transport/local" @@ -13,10 +15,10 @@ import ( "github.com/zrepl/zrepl/transport/tls" ) -func ListenerFactoryFromConfig(g *config.Global, in config.ServeEnum) (transport.AuthenticatedListenerFactory,error) { +func ListenerFactoryFromConfig(g *config.Global, in config.ServeEnum) (transport.AuthenticatedListenerFactory, error) { var ( - l transport.AuthenticatedListenerFactory + l transport.AuthenticatedListenerFactory err error ) switch v := in.Ret.(type) { @@ -35,7 +37,6 @@ func ListenerFactoryFromConfig(g *config.Global, in config.ServeEnum) (transport return l, err } - func ConnecterFromConfig(g *config.Global, in config.ConnectEnum) (transport.Connecter, error) { var ( connecter transport.Connecter diff --git a/transport/local/connect_local.go b/transport/local/connect_local.go index ba390b8..76102ea 100644 --- a/transport/local/connect_local.go +++ b/transport/local/connect_local.go @@ -3,12 +3,13 @@ package local import ( "context" "fmt" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/transport" ) type LocalConnecter struct { - listenerName string + listenerName string clientIdentity string } @@ -26,4 +27,3 @@ func (c *LocalConnecter) Connect(dialCtx context.Context) (transport.Wire, error l := GetLocalListener(c.listenerName) return l.Connect(dialCtx, c.clientIdentity) } - diff --git a/transport/local/serve_local.go b/transport/local/serve_local.go index f7e42aa..3a7be66 100644 --- a/transport/local/serve_local.go +++ b/transport/local/serve_local.go @@ -3,20 +3,21 @@ package local import ( "context" "fmt" - "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/util/socketpair" "net" "sync" + + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/transport" + "github.com/zrepl/zrepl/util/socketpair" ) var localListeners struct { - m map[string]*LocalListener // listenerName -> listener + m map[string]*LocalListener // listenerName -> listener init sync.Once - mtx sync.Mutex + mtx sync.Mutex } -func GetLocalListener(listenerName string) (*LocalListener) { +func GetLocalListener(listenerName string) *LocalListener { localListeners.init.Do(func() { localListeners.m = make(map[string]*LocalListener) @@ -36,12 +37,12 @@ func GetLocalListener(listenerName string) (*LocalListener) { type connectRequest struct { clientIdentity string - callback chan connectResult + callback chan connectResult } type connectResult struct { conn transport.Wire - err error + err error } type LocalListener struct { @@ -60,7 +61,7 @@ func (l *LocalListener) Connect(dialCtx context.Context, clientIdentity string) // place request req := connectRequest{ clientIdentity: clientIdentity, - callback: make(chan connectResult), + callback: make(chan connectResult), } select { case l.connects <- req: @@ -70,7 +71,7 @@ func (l *LocalListener) Connect(dialCtx context.Context, clientIdentity string) // wait for listener response select { - case connRes := <- req.callback: + case connRes := <-req.callback: conn, err = connRes.conn, connRes.err case <-dialCtx.Done(): close(req.callback) // sending to the channel afterwards will panic, the listener has to catch this @@ -88,7 +89,7 @@ func (localAddr) Network() string { return "local" } func (a localAddr) String() string { return a.S } -func (l *LocalListener) Addr() (net.Addr) { return localAddr{""} } +func (l *LocalListener) Addr() net.Addr { return localAddr{""} } func (l *LocalListener) Accept(ctx context.Context) (*transport.AuthConn, error) { respondToRequest := func(req connectRequest, res connectResult) (err error) { @@ -163,12 +164,12 @@ func (l *LocalListener) Close() error { return nil } -func LocalListenerFactoryFromConfig(g *config.Global, in *config.LocalServe) (transport.AuthenticatedListenerFactory,error) { +func LocalListenerFactoryFromConfig(g *config.Global, in *config.LocalServe) (transport.AuthenticatedListenerFactory, error) { if in.ListenerName == "" { return nil, fmt.Errorf("ListenerName must not be empty") } listenerName := in.ListenerName - lf := func() (transport.AuthenticatedListener,error) { + lf := func() (transport.AuthenticatedListener, error) { return GetLocalListener(listenerName), nil } return lf, nil diff --git a/transport/ssh/connect_ssh.go b/transport/ssh/connect_ssh.go index d669b88..7aaea43 100644 --- a/transport/ssh/connect_ssh.go +++ b/transport/ssh/connect_ssh.go @@ -2,12 +2,14 @@ package ssh import ( "context" + "time" + "github.com/jinzhu/copier" "github.com/pkg/errors" "github.com/problame/go-netssh" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/transport" - "time" ) type SSHStdinserverConnecter struct { diff --git a/transport/ssh/serve_stdinserver.go b/transport/ssh/serve_stdinserver.go index 39bfba8..03e0d51 100644 --- a/transport/ssh/serve_stdinserver.go +++ b/transport/ssh/serve_stdinserver.go @@ -1,19 +1,21 @@ package ssh import ( - "github.com/problame/go-netssh" - "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/daemon/nethelpers" - "github.com/zrepl/zrepl/transport" + "context" "fmt" "net" "path" - "context" - "github.com/pkg/errors" "sync/atomic" + + "github.com/pkg/errors" + "github.com/problame/go-netssh" + + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/nethelpers" + "github.com/zrepl/zrepl/transport" ) -func MultiStdinserverListenerFactoryFromConfig(g *config.Global, in *config.StdinserverServer) (transport.AuthenticatedListenerFactory,error) { +func MultiStdinserverListenerFactoryFromConfig(g *config.Global, in *config.StdinserverServer) (transport.AuthenticatedListenerFactory, error) { for _, ci := range in.ClientIdentities { if err := transport.ValidateClientIdentity(ci); err != nil { @@ -24,7 +26,7 @@ func MultiStdinserverListenerFactoryFromConfig(g *config.Global, in *config.Stdi clientIdentities := in.ClientIdentities sockdir := g.Serve.StdinServer.SockDir - lf := func() (transport.AuthenticatedListener,error) { + lf := func() (transport.AuthenticatedListener, error) { return multiStdinserverListenerFromClientIdentities(sockdir, clientIdentities) } @@ -33,13 +35,13 @@ func MultiStdinserverListenerFactoryFromConfig(g *config.Global, in *config.Stdi type multiStdinserverAcceptRes struct { conn *transport.AuthConn - err error + err error } type MultiStdinserverListener struct { listeners []*stdinserverListener - accepts chan multiStdinserverAcceptRes - closed int32 + accepts chan multiStdinserverAcceptRes + closed int32 } // client identities must be validated @@ -48,7 +50,7 @@ func multiStdinserverListenerFromClientIdentities(sockdir string, cis []string) var err error for _, ci := range cis { sockpath := path.Join(sockdir, ci) - l := &stdinserverListener{clientIdentity: ci} + l := &stdinserverListener{clientIdentity: ci} if err = nethelpers.PreparePrivateSockpath(sockpath); err != nil { break } @@ -66,7 +68,7 @@ func multiStdinserverListenerFromClientIdentities(sockdir string, cis []string) return &MultiStdinserverListener{listeners: listeners}, nil } -func (m *MultiStdinserverListener) Accept(ctx context.Context) (*transport.AuthConn, error){ +func (m *MultiStdinserverListener) Accept(ctx context.Context) (*transport.AuthConn, error) { if m.accepts == nil { m.accepts = make(chan multiStdinserverAcceptRes, len(m.listeners)) @@ -80,7 +82,7 @@ func (m *MultiStdinserverListener) Accept(ctx context.Context) (*transport.AuthC } } - res := <- m.accepts + res := <-m.accepts return res.conn, res.err } @@ -116,7 +118,7 @@ func (m *MultiStdinserverListener) Close() error { // a single stdinserverListener (part of multiStinserverListener) type stdinserverListener struct { - l *netssh.Listener + l *netssh.Listener clientIdentity string } diff --git a/transport/tcp/serve_tcp.go b/transport/tcp/serve_tcp.go index a6b8107..45d3851 100644 --- a/transport/tcp/serve_tcp.go +++ b/transport/tcp/serve_tcp.go @@ -1,15 +1,17 @@ package tcp import ( - "github.com/zrepl/zrepl/config" - "net" - "github.com/pkg/errors" "context" + "net" + + "github.com/pkg/errors" + + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/transport" ) type ipMapEntry struct { - ip net.IP + ip net.IP ident string } @@ -25,7 +27,7 @@ func ipMapFromConfig(clients map[string]string) (*ipMap, error) { return nil, errors.Errorf("cannot parse client IP %q", clientIPString) } if err := transport.ValidateClientIdentity(clientIdent); err != nil { - return nil, errors.Wrapf(err,"invalid client identity for IP %q", clientIPString) + return nil, errors.Wrapf(err, "invalid client identity for IP %q", clientIPString) } entries = append(entries, ipMapEntry{clientIP, clientIdent}) } @@ -79,4 +81,3 @@ func (f *TCPAuthListener) Accept(ctx context.Context) (*transport.AuthConn, erro } return transport.NewAuthConn(nc, clientIdent), nil } - diff --git a/transport/tls/connect_tls.go b/transport/tls/connect_tls.go index ea578d4..ebe447f 100644 --- a/transport/tls/connect_tls.go +++ b/transport/tls/connect_tls.go @@ -6,6 +6,7 @@ import ( "net" "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/tlsconf" "github.com/zrepl/zrepl/transport" diff --git a/transport/tls/serve_tls.go b/transport/tls/serve_tls.go index 21aafe4..c598b61 100644 --- a/transport/tls/serve_tls.go +++ b/transport/tls/serve_tls.go @@ -1,16 +1,18 @@ package tls import ( + "context" "crypto/tls" "crypto/x509" "fmt" - "github.com/pkg/errors" - "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/transport" - "github.com/zrepl/zrepl/tlsconf" "net" "time" - "context" + + "github.com/pkg/errors" + + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/tlsconf" + "github.com/zrepl/zrepl/transport" ) type TLSListenerFactory struct { @@ -18,10 +20,10 @@ type TLSListenerFactory struct { clientCA *x509.CertPool serverCert tls.Certificate handshakeTimeout time.Duration - clientCNs map[string]struct{} + clientCNs map[string]struct{} } -func TLSListenerFactoryFromConfig(c *config.Global, in *config.TLSServe) (transport.AuthenticatedListenerFactory,error) { +func TLSListenerFactoryFromConfig(c *config.Global, in *config.TLSServe) (transport.AuthenticatedListenerFactory, error) { address := in.Listen handshakeTimeout := in.HandshakeTimeout @@ -85,5 +87,3 @@ func (l tlsAuthListener) Accept(ctx context.Context) (*transport.AuthConn, error adaptor := newWireAdaptor(tlsConn, tcpConn) return transport.NewAuthConn(adaptor, cn), nil } - - diff --git a/util/bytecounter/bytecounter_streamcopier_test.go b/util/bytecounter/bytecounter_streamcopier_test.go index 29611e0..f4f8b1e 100644 --- a/util/bytecounter/bytecounter_streamcopier_test.go +++ b/util/bytecounter/bytecounter_streamcopier_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/zrepl/zrepl/zfs" ) diff --git a/util/chainlock/chainlock.go b/util/chainlock/chainlock.go index 6e7b5e5..ef1d971 100644 --- a/util/chainlock/chainlock.go +++ b/util/chainlock/chainlock.go @@ -39,4 +39,4 @@ func (l *L) NewCond() *sync.Cond { func (l *L) DropWhile(f func()) { defer l.Unlock().Lock() f() -} \ No newline at end of file +} diff --git a/util/chunking_test.go b/util/chunking_test.go index 3ca48f2..c0641ad 100644 --- a/util/chunking_test.go +++ b/util/chunking_test.go @@ -3,11 +3,12 @@ package util import ( "bytes" "encoding/binary" - "github.com/stretchr/testify/assert" "io" "reflect" "testing" "testing/quick" + + "github.com/stretchr/testify/assert" ) func TestUnchunker(t *testing.T) { diff --git a/util/contextflexibletimeout_test.go b/util/contextflexibletimeout_test.go index e6a1128..4ff1dfd 100644 --- a/util/contextflexibletimeout_test.go +++ b/util/contextflexibletimeout_test.go @@ -2,10 +2,11 @@ package util import ( "context" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "testing" "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestContextWithOptionalDeadline(t *testing.T) { diff --git a/util/io.go b/util/io.go index 8448e9d..32e35ec 100644 --- a/util/io.go +++ b/util/io.go @@ -101,16 +101,16 @@ func (c *ChainedReader) Read(buf []byte) (n int, err error) { } type ByteCounterReader struct { - reader io.ReadCloser + reader io.ReadCloser // called & accessed synchronously during Read, no external access - cb func(full int64) - cbEvery time.Duration - lastCbAt time.Time + cb func(full int64) + cbEvery time.Duration + lastCbAt time.Time bytesSinceLastCb int64 // set atomically because it may be read by multiple threads - bytes int64 + bytes int64 } func NewByteCounterReader(reader io.ReadCloser) *ByteCounterReader { diff --git a/util/iocommand.go b/util/iocommand.go index fe5f9a3..31ff8cb 100644 --- a/util/iocommand.go +++ b/util/iocommand.go @@ -4,18 +4,19 @@ import ( "bytes" "context" "fmt" - "github.com/zrepl/zrepl/util/envconst" "io" "os" "os/exec" "syscall" "time" + + "github.com/zrepl/zrepl/util/envconst" ) // An IOCommand exposes a forked process's std(in|out|err) through the io.ReadWriteCloser interface. type IOCommand struct { Cmd *exec.Cmd - kill context.CancelFunc + kill context.CancelFunc Stdin io.WriteCloser Stdout io.ReadCloser StderrBuf *bytes.Buffer diff --git a/zfs/datasetpath_visitor_test.go b/zfs/datasetpath_visitor_test.go index 7e348f2..731d5a6 100644 --- a/zfs/datasetpath_visitor_test.go +++ b/zfs/datasetpath_visitor_test.go @@ -1,8 +1,9 @@ package zfs import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestNewDatasetPathTree(t *testing.T) { diff --git a/zfs/mapping.go b/zfs/mapping.go index dd874e2..ca2c794 100644 --- a/zfs/mapping.go +++ b/zfs/mapping.go @@ -13,7 +13,8 @@ type DatasetFilter interface { func NoFilter() DatasetFilter { return noFilter{} } -type noFilter struct {} + +type noFilter struct{} var _ DatasetFilter = noFilter{} diff --git a/zfs/replication_history.go b/zfs/replication_history.go index d420047..acc0a51 100644 --- a/zfs/replication_history.go +++ b/zfs/replication_history.go @@ -2,8 +2,9 @@ package zfs import ( "fmt" - "github.com/pkg/errors" "strconv" + + "github.com/pkg/errors" ) const ReplicationCursorBookmarkName = "zrepl_replication_cursor" diff --git a/zfs/resume_token_test.go b/zfs/resume_token_test.go index 4a35595..d4acd6f 100644 --- a/zfs/resume_token_test.go +++ b/zfs/resume_token_test.go @@ -2,9 +2,11 @@ package zfs_test import ( "context" - "github.com/stretchr/testify/assert" - "github.com/zrepl/zrepl/zfs" "testing" + + "github.com/stretchr/testify/assert" + + "github.com/zrepl/zrepl/zfs" ) type ResumeTokenTest struct { diff --git a/zfs/versions.go b/zfs/versions.go index 29303c0..d211621 100644 --- a/zfs/versions.go +++ b/zfs/versions.go @@ -5,11 +5,12 @@ import ( "context" "errors" "fmt" - "github.com/prometheus/client_golang/prometheus" "io" "strconv" "strings" "time" + + "github.com/prometheus/client_golang/prometheus" ) type VersionType string diff --git a/zfs/zfs.go b/zfs/zfs.go index 40b1434..257f509 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -15,9 +15,11 @@ import ( "time" "context" - "github.com/prometheus/client_golang/prometheus" "regexp" "strconv" + + "github.com/prometheus/client_golang/prometheus" + "github.com/zrepl/zrepl/util/envconst" ) @@ -351,7 +353,7 @@ type readErrRecorder struct { type sendStreamCopierError struct { isReadErr bool // if false, it's a write error - err error + err error } func (e sendStreamCopierError) Error() string { @@ -362,7 +364,7 @@ func (e sendStreamCopierError) Error() string { } } -func (e sendStreamCopierError) IsReadError() bool { return e.isReadErr } +func (e sendStreamCopierError) IsReadError() bool { return e.isReadErr } func (e sendStreamCopierError) IsWriteError() bool { return !e.isReadErr } func (r *readErrRecorder) Read(p []byte) (n int, err error) { @@ -410,13 +412,12 @@ func pipeWithCapacityHint(capacity int) (r, w *os.File, err error) { } type sendStream struct { - cmd *exec.Cmd + cmd *exec.Cmd kill context.CancelFunc - closeMtx sync.Mutex + closeMtx sync.Mutex stdoutReader *os.File - opErr error - + opErr error } func (s *sendStream) Read(p []byte) (n int, err error) { @@ -484,7 +485,7 @@ func (s *sendStream) killAndWait(precedingReadErr error) error { if closePipeErr == nil { // avoid double-closes in case anything below doesn't work // and someone calls Close again - s.stdoutReader = nil + s.stdoutReader = nil } else { return closePipeErr } @@ -493,7 +494,7 @@ func (s *sendStream) killAndWait(precedingReadErr error) error { // we managed to tear things down, no let's give the user some pretty *ZFSError if exitErr != nil { s.opErr = &ZFSError{ - Stderr: exitErr.Stderr, + Stderr: exitErr.Stderr, WaitErr: exitErr, } } else { @@ -545,15 +546,14 @@ func ZFSSend(ctx context.Context, fs string, from, to string, token string) (str stdoutWriter.Close() stream := &sendStream{ - cmd: cmd, - kill: cancel, + cmd: cmd, + kill: cancel, stdoutReader: stdoutReader, } return newSendStreamCopier(stream), err } - type DrySendType string const ( @@ -563,25 +563,27 @@ const ( func DrySendTypeFromString(s string) (DrySendType, error) { switch s { - case string(DrySendTypeFull): return DrySendTypeFull, nil - case string(DrySendTypeIncremental): return DrySendTypeIncremental, nil + case string(DrySendTypeFull): + return DrySendTypeFull, nil + case string(DrySendTypeIncremental): + return DrySendTypeIncremental, nil default: return "", fmt.Errorf("unknown dry send type %q", s) } } type DrySendInfo struct { - Type DrySendType - Filesystem string // parsed from To field - From, To string // direct copy from ZFS output - SizeEstimate int64 // -1 if size estimate is not possible + Type DrySendType + Filesystem string // parsed from To field + From, To string // direct copy from ZFS output + SizeEstimate int64 // -1 if size estimate is not possible } var ( // keep same number of capture groups for unmarshalInfoLine homogenity sendDryRunInfoLineRegexFull = regexp.MustCompile(`^(full)\t()([^\t]+@[^\t]+)\t([0-9]+)$`) - // cannot enforce '[#@]' in incremental source, see test cases + // cannot enforce '[#@]' in incremental source, see test cases sendDryRunInfoLineRegexIncremental = regexp.MustCompile(`^(incremental)\t([^\t]+)\t([^\t]+@[^\t]+)\t([0-9]+)$`) ) @@ -602,7 +604,6 @@ func (s *DrySendInfo) unmarshalZFSOutput(output []byte) (err error) { return fmt.Errorf("no match for info line (regex1 %s) (regex2 %s)", sendDryRunInfoLineRegexFull, sendDryRunInfoLineRegexIncremental) } - // unmarshal info line, looks like this: // full zroot/test/a@1 5389768 // incremental zroot/test/a@1 zroot/test/a@2 5383936 @@ -653,19 +654,19 @@ func ZFSSendDry(fs string, from, to string, token string) (_ *DrySendInfo, err e * Redacted send & recv will bring this functionality, see * https://github.com/openzfs/openzfs/pull/484 */ - fromAbs, err := absVersion(fs, from) - if err != nil { - return nil, fmt.Errorf("error building abs version for 'from': %s", err) - } - toAbs, err := absVersion(fs, to) - if err != nil { - return nil, fmt.Errorf("error building abs version for 'to': %s", err) - } - return &DrySendInfo{ - Type: DrySendTypeIncremental, - Filesystem: fs, - From: fromAbs, - To: toAbs, + fromAbs, err := absVersion(fs, from) + if err != nil { + return nil, fmt.Errorf("error building abs version for 'from': %s", err) + } + toAbs, err := absVersion(fs, to) + if err != nil { + return nil, fmt.Errorf("error building abs version for 'to': %s", err) + } + return &DrySendInfo{ + Type: DrySendTypeIncremental, + Filesystem: fs, + From: fromAbs, + To: toAbs, SizeEstimate: -1}, nil } @@ -784,7 +785,7 @@ func ZFSRecv(ctx context.Context, fs string, streamCopier StreamCopier, opts Rec if err != nil { return err } - + cmd.Stdin = stdin if err = cmd.Start(); err != nil { @@ -794,7 +795,7 @@ func ZFSRecv(ctx context.Context, fs string, streamCopier StreamCopier, opts Rec } stdin.Close() defer stdinWriter.Close() - + pid := cmd.Process.Pid debug := func(format string, args ...interface{}) { debug("recv: pid=%v: %s", pid, fmt.Sprintf(format, args...)) @@ -823,7 +824,7 @@ func ZFSRecv(ctx context.Context, fs string, streamCopier StreamCopier, opts Rec copierErr := <-copierErrChan debug("copierErr: %T %s", copierErr, copierErr) if copierErr != nil { - cancelCmd() + cancelCmd() } waitErr := <-waitErrChan @@ -838,7 +839,7 @@ func ZFSRecv(ctx context.Context, fs string, streamCopier StreamCopier, opts Rec type ClearResumeTokenError struct { ZFSOutput []byte - CmdError error + CmdError error } func (e ClearResumeTokenError) Error() string { @@ -947,13 +948,27 @@ const ( func (s zfsPropertySource) zfsGetSourceFieldPrefixes() []string { prefixes := make([]string, 0, 7) - if s&sourceLocal != 0 {prefixes = append(prefixes, "local")} - if s&sourceDefault != 0 {prefixes = append(prefixes, "default")} - if s&sourceInherited != 0 {prefixes = append(prefixes, "inherited")} - if s&sourceNone != 0 {prefixes = append(prefixes, "-")} - if s&sourceTemporary != 0 { prefixes = append(prefixes, "temporary")} - if s&sourceReceived != 0 { prefixes = append(prefixes, "received")} - if s == sourceAny { prefixes = append(prefixes, "") } + if s&sourceLocal != 0 { + prefixes = append(prefixes, "local") + } + if s&sourceDefault != 0 { + prefixes = append(prefixes, "default") + } + if s&sourceInherited != 0 { + prefixes = append(prefixes, "inherited") + } + if s&sourceNone != 0 { + prefixes = append(prefixes, "-") + } + if s&sourceTemporary != 0 { + prefixes = append(prefixes, "temporary") + } + if s&sourceReceived != 0 { + prefixes = append(prefixes, "received") + } + if s == sourceAny { + prefixes = append(prefixes, "") + } return prefixes } @@ -992,7 +1007,7 @@ func zfsGet(path string, props []string, allowedSources zfsPropertySource) (*ZFS return nil, fmt.Errorf("zfs get did not return property,value,source tuples") } for _, p := range allowedPrefixes { - if strings.HasPrefix(fields[2],p) { + if strings.HasPrefix(fields[2], p) { res.m[fields[0]] = fields[1] break } @@ -1010,8 +1025,10 @@ func ZFSDestroy(dataset string) (err error) { filesystem = dataset } else { switch dataset[idx] { - case '@': dstype = "snapshot" - case '#': dstype = "bookmark" + case '@': + dstype = "snapshot" + case '#': + dstype = "bookmark" } filesystem = dataset[:idx] } diff --git a/zfs/zfs_test.go b/zfs/zfs_test.go index 217d765..f9ea860 100644 --- a/zfs/zfs_test.go +++ b/zfs/zfs_test.go @@ -1,8 +1,9 @@ package zfs import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestZFSListHandlesProducesZFSErrorOnNonZeroExit(t *testing.T) { @@ -33,8 +34,8 @@ func TestDatasetPathTrimNPrefixComps(t *testing.T) { func TestZFSPropertySource(t *testing.T) { - tcs := []struct{ - in zfsPropertySource + tcs := []struct { + in zfsPropertySource exp []string }{ { @@ -43,11 +44,11 @@ func TestZFSPropertySource(t *testing.T) { exp: []string{"local", "default", "inherited", "-", "temporary", "received", ""}, }, { - in: sourceTemporary, + in: sourceTemporary, exp: []string{"temporary"}, }, { - in: sourceLocal|sourceInherited, + in: sourceLocal | sourceInherited, exp: []string{"local", "inherited"}, }, } @@ -137,9 +138,9 @@ size 10518512 incrementalWithSpacesInIntermediateComponent := "\nincremental\tblaffoo\tpool1/otherjob/another ds with spaces/childfs@blaffoo2\t624\nsize\t624\n" type tc struct { - name string - in string - exp *DrySendInfo + name string + in string + exp *DrySendInfo expErr bool } @@ -147,10 +148,10 @@ size 10518512 { name: "fullSend", in: fullSend, exp: &DrySendInfo{ - Type: DrySendTypeFull, - Filesystem: "zroot/test/a", - From: "", - To: "zroot/test/a@1", + Type: DrySendTypeFull, + Filesystem: "zroot/test/a", + From: "", + To: "zroot/test/a@1", SizeEstimate: 5389768, }, }, @@ -158,7 +159,7 @@ size 10518512 name: "incSend", in: incSend, exp: &DrySendInfo{ Type: DrySendTypeIncremental, - Filesystem: "zroot/test/a", + Filesystem: "zroot/test/a", From: "zroot/test/a@1", To: "zroot/test/a@2", SizeEstimate: 5383936, @@ -168,16 +169,16 @@ size 10518512 name: "incSendBookmark", in: incSendBookmark, exp: &DrySendInfo{ Type: DrySendTypeIncremental, - Filesystem: "zroot/test/a", + Filesystem: "zroot/test/a", From: "zroot/test/a#1", To: "zroot/test/a@2", SizeEstimate: 5383312, }, }, - { + { name: "incNoToken", in: incNoToken, exp: &DrySendInfo{ - Type: DrySendTypeIncremental, + Type: DrySendTypeIncremental, Filesystem: "zroot/test/a", // as can be seen in the string incNoToken, // we cannot infer whether the incremental source is a snapshot or bookmark @@ -189,10 +190,10 @@ size 10518512 { name: "fullNoToken", in: fullNoToken, exp: &DrySendInfo{ - Type: DrySendTypeFull, - Filesystem: "zroot/test/a", - From: "", - To: "zroot/test/a@3", + Type: DrySendTypeFull, + Filesystem: "zroot/test/a", + From: "", + To: "zroot/test/a@3", SizeEstimate: 10518512, }, },