diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..7d99bc4 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,10 @@ +linters: + enable: + - goimports + +issues: + exclude-rules: + - path: _test\.go + linters: + - errcheck + diff --git a/Makefile b/Makefile index ade61e7..38e83a6 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,12 @@ generate: #not part of the build, must do that manually go generate -x ./... format: - goimports -srcdir . -local 'github.com/zrepl/zrepl' -w $(shell find . -type f -name '*.go' -not -path "./vendor/*") + # FIXME build dependency + goimports -srcdir . -local 'github.com/zrepl/zrepl' -w $(shell find . -type f -name '*.go' -not -path "./vendor/*" -not -name '*.pb.go' -not -name '*_enumer.go') + +lint: + # v1.15.0 at the time of writing FIXME build dependency + golangci-lint run ./... build: @echo "INFO: In case of missing dependencies, run 'make vendordeps'" diff --git a/build/build.go b/build/build.go index 8f31bd6..931fc64 100644 --- a/build/build.go +++ b/build/build.go @@ -12,7 +12,6 @@ package main import ( "fmt" - _ "fmt" _ "github.com/alvaroloes/enumer" _ "github.com/golang/protobuf/protoc-gen-go" diff --git a/cli/cli.go b/cli/cli.go index 8e2ede0..e84affe 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -25,7 +25,10 @@ var bashcompCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { if len(args) != 1 { fmt.Fprintf(os.Stderr, "specify exactly one positional agument\n") - cmd.Usage() + err := cmd.Usage() + if err != nil { + panic(err) + } os.Exit(1) } if err := rootCmd.GenBashCompletionFile(args[0]); err != nil { diff --git a/client/configcheck.go b/client/configcheck.go index 8ecd981..10c55d5 100644 --- a/client/configcheck.go +++ b/client/configcheck.go @@ -31,13 +31,21 @@ var ConfigcheckCmd = &cli.Subcommand{ }, Run: func(subcommand *cli.Subcommand, args []string) error { formatMap := map[string]func(interface{}){ - "": func(i interface{}) {}, - "pretty": func(i interface{}) { pretty.Println(i) }, + "": func(i interface{}) {}, + "pretty": func(i interface{}) { + if _, err := pretty.Println(i); err != nil { + panic(err) + } + }, "json": func(i interface{}) { - json.NewEncoder(os.Stdout).Encode(subcommand.Config()) + if err := json.NewEncoder(os.Stdout).Encode(subcommand.Config()); err != nil { + panic(err) + } }, "yaml": func(i interface{}) { - yaml.NewEncoder(os.Stdout).Encode(subcommand.Config()) + if err := yaml.NewEncoder(os.Stdout).Encode(subcommand.Config()); err != nil { + panic(err) + } }, } diff --git a/client/jsonclient.go b/client/jsonclient.go index 9f66aec..3152b99 100644 --- a/client/jsonclient.go +++ b/client/jsonclient.go @@ -36,7 +36,7 @@ func jsonRequestResponse(c http.Client, endpoint string, req interface{}, res in if resp.StatusCode != http.StatusOK { var msg bytes.Buffer - io.CopyN(&msg, resp.Body, 4096) + _, _ = io.CopyN(&msg, resp.Body, 4096) // ignore error, just display what we got return errors.Errorf("%s", msg.String()) } diff --git a/client/migrate.go b/client/migrate.go index 22a7270..47d5f22 100644 --- a/client/migrate.go +++ b/client/migrate.go @@ -23,11 +23,6 @@ var ( } ) -type migration struct { - name string - method func(config *config.Config, args []string) error -} - var migrations = []*cli.Subcommand{ &cli.Subcommand{ Use: "0.0.X:0.1:placeholder", diff --git a/client/status.go b/client/status.go index 0e7267a..9c85a9e 100644 --- a/client/status.go +++ b/client/status.go @@ -50,13 +50,13 @@ func (p *bytesProgressHistory) Update(currentVal int64) (bytesPerSecondAvg int64 p.lastChange = time.Now() } - if time.Now().Sub(p.lastChange) > 3*time.Second { + if time.Since(p.lastChange) > 3*time.Second { p.last = nil return 0, 0 } deltaV := currentVal - p.last.val - deltaT := time.Now().Sub(p.last.time) + deltaT := time.Since(p.last.time) rate := float64(deltaV) / deltaT.Seconds() factor := 0.3 @@ -81,15 +81,10 @@ type tui struct { func newTui() tui { return tui{ - replicationProgress: make(map[string]*bytesProgressHistory, 0), + replicationProgress: make(map[string]*bytesProgressHistory), } } -func (t *tui) moveCursor(x, y int) { - t.x += x - t.y += y -} - const INDENT_MULTIPLIER = 4 func (t *tui) moveLine(dl int, col int) { @@ -187,7 +182,10 @@ func runStatus(s *cli.Subcommand, args []string) error { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { fmt.Fprintf(os.Stderr, "Received error response:\n") - io.CopyN(os.Stderr, resp.Body, 4096) + _, err := io.CopyN(os.Stderr, resp.Body, 4096) + if err != nil { + return err + } return errors.Errorf("exit") } if _, err := io.Copy(os.Stdout, resp.Body); err != nil { @@ -226,7 +224,7 @@ func runStatus(s *cli.Subcommand, args []string) error { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() go func() { - for _ = range ticker.C { + for range ticker.C { update() } }() @@ -277,7 +275,7 @@ func (t *tui) draw() { //Iterate over map in alphabetical order keys := make([]string, len(t.report)) i := 0 - for k, _ := range t.report { + for k := range t.report { keys[i] = k i++ } @@ -363,7 +361,7 @@ func (t *tui) renderReplicationReport(rep *report.Report, history *bytesProgress t.newline() } if !rep.WaitReconnectSince.IsZero() { - delta := rep.WaitReconnectUntil.Sub(time.Now()).Round(time.Second) + delta := time.Until(rep.WaitReconnectUntil).Round(time.Second) if rep.WaitReconnectUntil.IsZero() || delta > 0 { var until string if rep.WaitReconnectUntil.IsZero() { @@ -561,13 +559,6 @@ func rightPad(str string, length int, pad string) string { return str + times(pad, length-len(str)) } -func leftPad(str string, length int, pad string) string { - if len(str) > length { - return str[len(str)-length:] - } - return times(pad, length-len(str)) + str -} - var arrowPositions = `>\|/` // changeCount = 0 indicates stall / no progresss diff --git a/client/testcmd.go b/client/testcmd.go index 81136bb..5417c95 100644 --- a/client/testcmd.go +++ b/client/testcmd.go @@ -113,10 +113,8 @@ func runTestFilterCmd(subcommand *cli.Subcommand, args []string) error { } var testPlaceholderArgs struct { - action string - ds string - plv string - all bool + ds string + all bool } var testPlaceholder = &cli.Subcommand{ diff --git a/config/config_test.go b/config/config_test.go index d51f3f7..2c17740 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -39,6 +39,7 @@ func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) { } // template must be a template/text template with a single '{{ . }}' as placehodler for val +//nolint[:deadcode,unused] func testValidConfigTemplate(t *testing.T, tmpl string, val string) *Config { tmp, err := template.New("master").Parse(tmpl) if err != nil { diff --git a/config/retentiongrid.go b/config/retentiongrid.go index 5a91bca..e3ca38b 100644 --- a/config/retentiongrid.go +++ b/config/retentiongrid.go @@ -31,10 +31,6 @@ func (i *RetentionInterval) KeepCount() int { const RetentionGridKeepCountAll int = -1 -type RetentionGrid struct { - intervals []RetentionInterval -} - func (t *RetentionIntervalList) UnmarshalYAML(u func(interface{}, bool) error) (err error) { var in string if err := u(&in, true); err != nil { diff --git a/daemon/control.go b/daemon/control.go index a19bd5a..9c5e0cb 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -89,7 +89,7 @@ func (j *controlJob) Run(ctx context.Context) { mux := http.NewServeMux() mux.Handle(ControlJobEndpointPProf, - requestLogger{log: log, handler: jsonRequestResponder{func(decoder jsonDecoder) (interface{}, error) { + requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (interface{}, error) { var msg PprofServerControlMsg err := decoder(&msg) if err != nil { @@ -100,19 +100,19 @@ func (j *controlJob) Run(ctx context.Context) { }}}) mux.Handle(ControlJobEndpointVersion, - requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { + requestLogger{log: log, handler: jsonResponder{log, func() (interface{}, error) { return version.NewZreplVersionInformation(), nil }}}) mux.Handle(ControlJobEndpointStatus, // don't log requests to status endpoint, too spammy - jsonResponder{func() (interface{}, error) { + jsonResponder{log, func() (interface{}, error) { s := j.jobs.status() return s, nil }}) mux.Handle(ControlJobEndpointSignal, - requestLogger{log: log, handler: jsonRequestResponder{func(decoder jsonDecoder) (interface{}, error) { + requestLogger{log: log, handler: jsonRequestResponder{log, func(decoder jsonDecoder) (interface{}, error) { type reqT struct { Name string Op string @@ -153,7 +153,10 @@ outer: select { case <-ctx.Done(): log.WithError(ctx.Err()).Info("context done") - server.Shutdown(context.Background()) + err := server.Shutdown(context.Background()) + if err != nil { + log.WithError(err).Error("cannot shutdown server") + } break outer case err = <-served: if err != nil { @@ -167,33 +170,50 @@ outer: } type jsonResponder struct { + log Logger producer func() (interface{}, error) } func (j jsonResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) { + logIoErr := func(err error) { + if err != nil { + j.log.WithError(err).Error("control handler io error") + } + } res, err := j.producer() if err != nil { + j.log.WithError(err).Error("control handler error") w.WriteHeader(http.StatusInternalServerError) - io.WriteString(w, err.Error()) + _, err = io.WriteString(w, err.Error()) + logIoErr(err) return } var buf bytes.Buffer err = json.NewEncoder(&buf).Encode(res) if err != nil { + j.log.WithError(err).Error("control handler json marshal error") w.WriteHeader(http.StatusInternalServerError) - io.WriteString(w, err.Error()) + _, err = io.WriteString(w, err.Error()) } else { - io.Copy(w, &buf) + _, err = io.Copy(w, &buf) } + logIoErr(err) } type jsonDecoder = func(interface{}) error type jsonRequestResponder struct { + log Logger producer func(decoder jsonDecoder) (interface{}, error) } func (j jsonRequestResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) { + logIoErr := func(err error) { + if err != nil { + j.log.WithError(err).Error("control handler io error") + } + } + var decodeError error decoder := func(i interface{}) error { err := json.NewDecoder(r.Body).Decode(&i) @@ -205,22 +225,28 @@ func (j jsonRequestResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) //If we had a decode error ignore output of producer and return error if decodeError != nil { w.WriteHeader(http.StatusBadRequest) - io.WriteString(w, decodeError.Error()) + _, err := io.WriteString(w, decodeError.Error()) + logIoErr(err) return } if producerErr != nil { + j.log.WithError(producerErr).Error("control handler error") w.WriteHeader(http.StatusInternalServerError) - io.WriteString(w, producerErr.Error()) + _, err := io.WriteString(w, producerErr.Error()) + logIoErr(err) return } var buf bytes.Buffer encodeErr := json.NewEncoder(&buf).Encode(res) if encodeErr != nil { + j.log.WithError(producerErr).Error("control handler json marhsal error") w.WriteHeader(http.StatusInternalServerError) - io.WriteString(w, encodeErr.Error()) + _, err := io.WriteString(w, encodeErr.Error()) + logIoErr(err) } else { - io.Copy(w, &buf) + _, err := io.Copy(w, &buf) + logIoErr(err) } } diff --git a/daemon/daemon.go b/daemon/daemon.go index b4236d3..94fd7a2 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -117,9 +117,7 @@ func newJobs() *jobs { } const ( - logJobField string = "job" - logTaskField string = "task" - logSubsysField string = "subsystem" + logJobField string = "job" ) func (s *jobs) wait() <-chan struct{} { diff --git a/daemon/job/active.go b/daemon/job/active.go index a5b5a94..a878f19 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -68,8 +68,7 @@ type activeSideTasks struct { func (a *ActiveSide) updateTasks(u func(*activeSideTasks)) activeSideTasks { a.tasksMtx.Lock() defer a.tasksMtx.Unlock() - var copy activeSideTasks - copy = a.tasks + copy := a.tasks if u == nil { return copy } diff --git a/daemon/job/job.go b/daemon/job/job.go index ec844a5..42fdfab 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -94,18 +94,21 @@ func (s *Status) UnmarshalJSON(in []byte) (err error) { var st SnapJobStatus err = json.Unmarshal(jobJSON, &st) s.JobSpecific = &st + case TypePull: fallthrough case TypePush: var st ActiveSideStatus err = json.Unmarshal(jobJSON, &st) s.JobSpecific = &st + case TypeSource: fallthrough case TypeSink: var st PassiveStatus err = json.Unmarshal(jobJSON, &st) s.JobSpecific = &st + case TypeInternal: // internal jobs do not report specifics default: diff --git a/daemon/logging/logging_formatters.go b/daemon/logging/logging_formatters.go index df09eac..7179676 100644 --- a/daemon/logging/logging_formatters.go +++ b/daemon/logging/logging_formatters.go @@ -161,10 +161,16 @@ func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) { enc := logfmt.NewEncoder(&buf) if f.metadataFlags&MetadataTime != 0 { - enc.EncodeKeyval(FieldTime, e.Time) + err := enc.EncodeKeyval(FieldTime, e.Time) + if err != nil { + return nil, errors.Wrap(err, "logfmt: encode time") + } } if f.metadataFlags&MetadataLevel != 0 { - enc.EncodeKeyval(FieldLevel, e.Level) + err := enc.EncodeKeyval(FieldLevel, e.Level) + if err != nil { + return nil, errors.Wrap(err, "logfmt: encode level") + } } // at least try and put job and task in front @@ -181,8 +187,10 @@ func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) { prefixed[pf] = true } - enc.EncodeKeyval(FieldMessage, e.Message) - + err := enc.EncodeKeyval(FieldMessage, e.Message) + if err != nil { + return nil, errors.Wrap(err, "logfmt: encode message") + } for k, v := range e.Fields { if !prefixed[k] { if err := logfmtTryEncodeKeyval(enc, k, v); err != nil { @@ -201,7 +209,10 @@ func logfmtTryEncodeKeyval(enc *logfmt.Encoder, field, value interface{}) error case nil: // ok return nil case logfmt.ErrUnsupportedValueType: - enc.EncodeKeyval(field, fmt.Sprintf("<%T>", value)) + err := enc.EncodeKeyval(field, fmt.Sprintf("<%T>", value)) + if err != nil { + return errors.Wrap(err, "cannot encode unsuuported value type Go type") + } return nil } return errors.Wrapf(err, "cannot encode field '%s'", field) diff --git a/daemon/logging/logging_outlets.go b/daemon/logging/logging_outlets.go index c2b8a01..2abe3e3 100644 --- a/daemon/logging/logging_outlets.go +++ b/daemon/logging/logging_outlets.go @@ -30,7 +30,10 @@ func (h WriterOutlet) WriteEntry(entry logger.Entry) error { return err } _, err = h.writer.Write(bytes) - h.writer.Write([]byte("\n")) + if err != nil { + return err + } + _, err = h.writer.Write([]byte("\n")) return err } @@ -94,8 +97,10 @@ func (h *TCPOutlet) outLoop(retryInterval time.Duration) { conn = nil } } - conn.SetWriteDeadline(time.Now().Add(retryInterval)) - _, err = io.Copy(conn, msg) + err = conn.SetWriteDeadline(time.Now().Add(retryInterval)) + if err == nil { + _, err = io.Copy(conn, msg) + } if err != nil { retry = time.Now().Add(retryInterval) conn.Close() diff --git a/daemon/pprof.go b/daemon/pprof.go index 6c96251..8a6ff1a 100644 --- a/daemon/pprof.go +++ b/daemon/pprof.go @@ -7,11 +7,12 @@ import ( "context" "net" "net/http/pprof" + + "github.com/zrepl/zrepl/daemon/job" ) type pprofServer struct { cc chan PprofServerControlMsg - state PprofServerControlMsg listener net.Listener } @@ -63,7 +64,14 @@ outer: mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) - go http.Serve(s.listener, mux) + go func() { + err := http.Serve(s.listener, mux) + if ctx.Err() != nil { + return + } else if err != nil { + job.GetLogger(ctx).WithError(err).Error("pprof server serve error") + } + }() continue } diff --git a/daemon/prometheus.go b/daemon/prometheus.go index 71601ff..3d805ed 100644 --- a/daemon/prometheus.go +++ b/daemon/prometheus.go @@ -65,17 +65,15 @@ func (j *prometheusJob) Run(ctx context.Context) { log.WithError(err).Error("cannot listen") } go func() { - select { - case <-ctx.Done(): - l.Close() - } + <-ctx.Done() + l.Close() }() mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) err = http.Serve(l, mux) - if err != nil { + if err != nil && ctx.Err() == nil { log.WithError(err).Error("error while serving") } diff --git a/daemon/pruner/pruner.go b/daemon/pruner/pruner.go index a9983bb..948fbb2 100644 --- a/daemon/pruner/pruner.go +++ b/daemon/pruner/pruner.go @@ -86,19 +86,6 @@ type LocalPrunerFactory struct { promPruneSecs *prometheus.HistogramVec } -func checkContainsKeep1(rules []pruning.KeepRule) error { - if len(rules) == 0 { - return nil //No keep rules means keep all - ok - } - for _, e := range rules { - switch e.(type) { - case *pruning.KeepLastN: - return nil - } - } - return errors.New("sender keep rules must contain last_n or be empty so that the last snapshot is definitely kept") -} - func NewLocalPrunerFactory(in config.PruningLocal, promPruneSecs *prometheus.HistogramVec) (*LocalPrunerFactory, error) { rules, err := pruning.RulesFromConfig(in.Keep) if err != nil { diff --git a/daemon/snapper/snapper.go b/daemon/snapper/snapper.go index 8ba6ce8..a6dc268 100644 --- a/daemon/snapper/snapper.go +++ b/daemon/snapper/snapper.go @@ -203,7 +203,7 @@ func syncUp(a args, u updater) state { u(func(s *Snapper) { s.sleepUntil = syncPoint }) - t := time.NewTimer(syncPoint.Sub(time.Now())) + t := time.NewTimer(time.Until(syncPoint)) defer t.Stop() select { case <-t.C: @@ -307,7 +307,7 @@ func wait(a args, u updater) state { logFunc("enter wait-state after error") }) - t := time.NewTimer(sleepUntil.Sub(time.Now())) + t := time.NewTimer(time.Until(sleepUntil)) defer t.Stop() select { diff --git a/logger/datastructures.go b/logger/datastructures.go index f1a9cfc..474f6f7 100644 --- a/logger/datastructures.go +++ b/logger/datastructures.go @@ -67,7 +67,7 @@ func (l Level) Short() string { case Error: return "ERRO" default: - return fmt.Sprintf("%s", l) + return l.String() } } @@ -82,7 +82,7 @@ func (l Level) String() string { case Error: return "error" default: - return fmt.Sprintf("%s", string(l)) + return string(l) } } diff --git a/logger/logger.go b/logger/logger.go index b007267..a2bd647 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -66,7 +66,8 @@ func (l *loggerImpl) logInternalError(outlet Outlet, err string) { time.Now(), fields, } - l.outlets.GetLoggerErrorOutlet().WriteEntry(entry) + // ignore errors at this point (still better than panicking if the error is temporary) + _ = l.outlets.GetLoggerErrorOutlet().WriteEntry(entry) } func (l *loggerImpl) log(level Level, msg string) { diff --git a/logger/stderrlogger.go b/logger/stderrlogger.go index 30c9a3e..fbb234b 100644 --- a/logger/stderrlogger.go +++ b/logger/stderrlogger.go @@ -5,10 +5,6 @@ import ( "os" ) -type stderrLogger struct { - Logger -} - type stderrLoggerOutlet struct{} func (stderrLoggerOutlet) WriteEntry(entry Entry) error { diff --git a/pruning/keep_not_replicated.go b/pruning/keep_not_replicated.go index 955d9e6..e84a5c2 100644 --- a/pruning/keep_not_replicated.go +++ b/pruning/keep_not_replicated.go @@ -1,8 +1,6 @@ package pruning -type KeepNotReplicated struct { - forceConstructor struct{} -} +type KeepNotReplicated struct{} func (*KeepNotReplicated) KeepRule(snaps []Snapshot) (destroyList []Snapshot) { return filterSnapList(snaps, func(snapshot Snapshot) bool { diff --git a/pruning/pruning.go b/pruning/pruning.go index e903852..9f36581 100644 --- a/pruning/pruning.go +++ b/pruning/pruning.go @@ -22,7 +22,7 @@ type Snapshot interface { // The returned snapshot list is guaranteed to only contains elements of input parameter snaps func PruneSnapshots(snaps []Snapshot, keepRules []KeepRule) []Snapshot { - if keepRules == nil || len(keepRules) == 0 { + if len(keepRules) == 0 { return []Snapshot{} } diff --git a/replication/driver/errorclass_enumer.go b/replication/driver/errorclass_enumer.go index 0a56c0e..d85c4b4 100644 --- a/replication/driver/errorclass_enumer.go +++ b/replication/driver/errorclass_enumer.go @@ -6,9 +6,9 @@ import ( "fmt" ) -const _errorClassName = "errorClassUnknownerrorClassPermanenterrorClassTemporaryConnectivityRelated" +const _errorClassName = "errorClassPermanenterrorClassTemporaryConnectivityRelated" -var _errorClassIndex = [...]uint8{0, 17, 36, 74} +var _errorClassIndex = [...]uint8{0, 19, 57} func (i errorClass) String() string { if i < 0 || i >= errorClass(len(_errorClassIndex)-1) { @@ -17,12 +17,11 @@ func (i errorClass) String() string { return _errorClassName[_errorClassIndex[i]:_errorClassIndex[i+1]] } -var _errorClassValues = []errorClass{0, 1, 2} +var _errorClassValues = []errorClass{0, 1} var _errorClassNameToValueMap = map[string]errorClass{ - _errorClassName[0:17]: 0, - _errorClassName[17:36]: 1, - _errorClassName[36:74]: 2, + _errorClassName[0:19]: 0, + _errorClassName[19:57]: 1, } // errorClassString retrieves an enum value from the enum constants string name. diff --git a/replication/driver/replication_driver.go b/replication/driver/replication_driver.go index 2480a20..0c1cd8f 100644 --- a/replication/driver/replication_driver.go +++ b/replication/driver/replication_driver.go @@ -339,7 +339,7 @@ func (a *attempt) do(ctx context.Context, prev *attempt) { l := fmt.Sprintf(" %s => %v", i.cur.fs.ReportInfo().Name, prevNames) inconsistencyLines = append(inconsistencyLines, l) } - fmt.Fprintf(&msg, strings.Join(inconsistencyLines, "\n")) + fmt.Fprint(&msg, strings.Join(inconsistencyLines, "\n")) now := time.Now() a.planErr = newTimedError(errors.New(msg.String()), now) a.fss = nil @@ -552,17 +552,11 @@ func (s *step) report() *report.StepReport { return r } -type stepErrorReport struct { - err *timedError - step int -} - //go:generate enumer -type=errorClass type errorClass int const ( - errorClassUnknown errorClass = iota - errorClassPermanent + errorClassPermanent errorClass = iota errorClassTemporaryConnectivityRelated ) diff --git a/replication/driver/replication_driver_debug.go b/replication/driver/replication_driver_debug.go index 1220527..6b96f09 100644 --- a/replication/driver/replication_driver_debug.go +++ b/replication/driver/replication_driver_debug.go @@ -13,6 +13,7 @@ func init() { } } +//nolint[:deadcode,unused] func debug(format string, args ...interface{}) { if debugEnabled { fmt.Fprintf(os.Stderr, "repl: driver: %s\n", fmt.Sprintf(format, args...)) @@ -21,6 +22,7 @@ func debug(format string, args ...interface{}) { type debugFunc func(format string, args ...interface{}) +//nolint[:deadcode,unused] func debugPrefix(prefixFormat string, prefixFormatArgs ...interface{}) debugFunc { prefix := fmt.Sprintf(prefixFormat, prefixFormatArgs...) return func(format string, args ...interface{}) { diff --git a/replication/driver/replication_driver_test.go b/replication/driver/replication_driver_test.go index 4f22649..abf39f1 100644 --- a/replication/driver/replication_driver_test.go +++ b/replication/driver/replication_driver_test.go @@ -166,14 +166,14 @@ func TestReplication(t *testing.T) { reports := make([]*report.Report, len(fireAt)) for i := range fireAt { sleepUntil := begin.Add(fireAt[i]) - time.Sleep(sleepUntil.Sub(time.Now())) + time.Sleep(time.Until(sleepUntil)) reports[i] = getReport() // uncomment for viewing non-diffed results // t.Logf("report @ %6.4f:\n%s", fireAt[i].Seconds(), pretty.Sprint(reports[i])) } waitBegin := time.Now() wait(true) - waitDuration := time.Now().Sub(waitBegin) + waitDuration := time.Since(waitBegin) assert.True(t, waitDuration < 10*time.Millisecond, "%v", waitDuration) // and that's gratious prev, err := json.Marshal(reports[0]) diff --git a/replication/driver/replication_stepqueue_test.go b/replication/driver/replication_stepqueue_test.go index fc0f316..73f877a 100644 --- a/replication/driver/replication_stepqueue_test.go +++ b/replication/driver/replication_stepqueue_test.go @@ -96,7 +96,7 @@ func TestPqConcurrent(t *testing.T) { pos := atomic.AddUint32(&globalCtr, 1) t := time.Unix(int64(step), 0) done := q.WaitReady(fs, t) - wakeAt := time.Now().Sub(begin) + wakeAt := time.Since(begin) time.Sleep(sleepTimePerStep) done() recs = append(recs, record{fs, step, pos, wakeAt}) diff --git a/rpc/dataconn/dataconn_debug.go b/rpc/dataconn/dataconn_debug.go index 3c20701..958ac80 100644 --- a/rpc/dataconn/dataconn_debug.go +++ b/rpc/dataconn/dataconn_debug.go @@ -13,6 +13,7 @@ func init() { } } +//nolint[:deadcode,unused] func debug(format string, args ...interface{}) { if debugEnabled { fmt.Fprintf(os.Stderr, "rpc/dataconn: %s\n", fmt.Sprintf(format, args...)) diff --git a/rpc/dataconn/dataconn_server.go b/rpc/dataconn/dataconn_server.go index 0fb1dc3..fbd1a95 100644 --- a/rpc/dataconn/dataconn_server.go +++ b/rpc/dataconn/dataconn_server.go @@ -138,7 +138,6 @@ func (s *Server) serveConn(nc *transport.AuthConn) { default: s.log.WithField("endpoint", endpoint).Error("unknown endpoint") handlerErr = fmt.Errorf("requested endpoint does not exist") - return } s.log.WithField("endpoint", endpoint).WithField("errType", fmt.Sprintf("%T", handlerErr)).Debug("handler returned") @@ -188,6 +187,4 @@ func (s *Server) serveConn(nc *transport.AuthConn) { s.log.WithError(err).Error("cannot write send stream") } } - - return } diff --git a/rpc/dataconn/frameconn/frameconn.go b/rpc/dataconn/frameconn/frameconn.go index 07615c5..321e6cb 100644 --- a/rpc/dataconn/frameconn/frameconn.go +++ b/rpc/dataconn/frameconn/frameconn.go @@ -1,7 +1,6 @@ package frameconn import ( - "bufio" "encoding/binary" "errors" "fmt" @@ -48,7 +47,6 @@ func (f *FrameHeader) Unmarshal(buf []byte) { type Conn struct { readMtx, writeMtx sync.Mutex nc timeoutconn.Conn - ncBuf *bufio.ReadWriter readNextValid bool readNext FrameHeader nextReadErr error diff --git a/rpc/dataconn/frameconn/frameconn_shutdown_fsm.go b/rpc/dataconn/frameconn/frameconn_shutdown_fsm.go index 0821f75..f727337 100644 --- a/rpc/dataconn/frameconn/frameconn_shutdown_fsm.go +++ b/rpc/dataconn/frameconn/frameconn_shutdown_fsm.go @@ -10,17 +10,11 @@ type shutdownFSM struct { type shutdownFSMState uint32 const ( + // zero value is important shutdownStateOpen shutdownFSMState = iota shutdownStateBegin ) -func newShutdownFSM() *shutdownFSM { - fsm := &shutdownFSM{ - state: shutdownStateOpen, - } - return fsm -} - func (f *shutdownFSM) Begin() (thisCallStartedShutdown bool) { f.mtx.Lock() defer f.mtx.Unlock() diff --git a/rpc/dataconn/heartbeatconn/heartbeatconn.go b/rpc/dataconn/heartbeatconn/heartbeatconn.go index 2924fdc..c4a6e7a 100644 --- a/rpc/dataconn/heartbeatconn/heartbeatconn.go +++ b/rpc/dataconn/heartbeatconn/heartbeatconn.go @@ -11,9 +11,7 @@ import ( ) type Conn struct { - state state - // if not nil, opErr is returned for ReadFrame and WriteFrame (not for Close, though) - opErr atomic.Value // error + state state fc *frameconn.Conn sendInterval, timeout time.Duration stopSend chan struct{} @@ -97,7 +95,10 @@ func (c *Conn) sendHeartbeats() { debug("send heartbeat") // if the connection is in zombie mode (aka iptables DROP inbetween peers) // this call or one of its successors will block after filling up the kernel tx buffer - c.fc.WriteFrame([]byte{}, heartbeat) + err := c.fc.WriteFrame([]byte{}, heartbeat) + if err != nil { + debug("send heartbeat error: %s", err) + } // ignore errors from WriteFrame to rate-limit SendHeartbeat retries c.lastFrameSent.Store(time.Now()) }() diff --git a/rpc/dataconn/heartbeatconn/heartbeatconn_debug.go b/rpc/dataconn/heartbeatconn/heartbeatconn_debug.go index 6bdea8d..caa8884 100644 --- a/rpc/dataconn/heartbeatconn/heartbeatconn_debug.go +++ b/rpc/dataconn/heartbeatconn/heartbeatconn_debug.go @@ -13,6 +13,7 @@ func init() { } } +//nolint[:deadcode,unused] func debug(format string, args ...interface{}) { if debugEnabled { fmt.Fprintf(os.Stderr, "rpc/dataconn/heartbeatconn: %s\n", fmt.Sprintf(format, args...)) diff --git a/rpc/dataconn/stream/stream.go b/rpc/dataconn/stream/stream.go index 10e7ff0..4d3e900 100644 --- a/rpc/dataconn/stream/stream.go +++ b/rpc/dataconn/stream/stream.go @@ -28,6 +28,7 @@ func WithLogger(ctx context.Context, log Logger) context.Context { return context.WithValue(ctx, contextKeyLogger, log) } +//nolint[:deadcode,unused] func getLog(ctx context.Context) Logger { log, ok := ctx.Value(contextKeyLogger).(Logger) if !ok { diff --git a/rpc/dataconn/stream/stream_conn.go b/rpc/dataconn/stream/stream_conn.go index ce0c052..7b1b081 100644 --- a/rpc/dataconn/stream/stream_conn.go +++ b/rpc/dataconn/stream/stream_conn.go @@ -23,9 +23,8 @@ type Conn struct { // readMtx serializes read stream operations because we inherently only // support a single stream at a time over hc. - readMtx sync.Mutex - readClean bool - allowWriteStreamTo bool + readMtx sync.Mutex + readClean bool // writeMtx serializes write stream operations because we inherently only // support a single stream at a time over hc. @@ -95,7 +94,7 @@ func (c *Conn) ReadStreamedMessage(ctx context.Context, maxSize uint32, frameTyp }() err := readStream(c.frameReads, c.hc, w, frameType) c.readClean = isConnCleanAfterRead(err) - w.CloseWithError(readMessageSentinel) + _ = w.CloseWithError(readMessageSentinel) // always returns nil wg.Wait() if err != nil { return nil, err @@ -166,7 +165,7 @@ func (c *Conn) SendStream(ctx context.Context, src zfs.StreamCopier, frameType u var res writeStreamRes res.errStream, res.errConn = writeStream(ctx, c.hc, r, frameType) if w != nil { - w.CloseWithError(res.errStream) + _ = w.CloseWithError(res.errStream) // always returns nil } writeStreamErrChan <- res }() diff --git a/rpc/dataconn/stream/stream_debug.go b/rpc/dataconn/stream/stream_debug.go index c1e2ef9..e86f41a 100644 --- a/rpc/dataconn/stream/stream_debug.go +++ b/rpc/dataconn/stream/stream_debug.go @@ -13,6 +13,7 @@ func init() { } } +//nolint[:deadcode,unused] func debug(format string, args ...interface{}) { if debugEnabled { fmt.Fprintf(os.Stderr, "rpc/dataconn/stream: %s\n", fmt.Sprintf(format, args...)) diff --git a/rpc/dataconn/timeoutconn/internal/wireevaluator/wireevaluator_closewrite.go b/rpc/dataconn/timeoutconn/internal/wireevaluator/wireevaluator_closewrite.go index dc61c99..e2dddbd 100644 --- a/rpc/dataconn/timeoutconn/internal/wireevaluator/wireevaluator_closewrite.go +++ b/rpc/dataconn/timeoutconn/internal/wireevaluator/wireevaluator_closewrite.go @@ -52,9 +52,6 @@ func (CloseWrite) sender(wire transport.Wire) { log.Printf("closeErr=%T %s", closeErr, closeErr) }() - type opResult struct { - err error - } writeDone := make(chan struct{}, 1) go func() { close(writeDone) diff --git a/rpc/dataconn/timeoutconn/timeoutconn.go b/rpc/dataconn/timeoutconn/timeoutconn.go index 9e0a3bf..e0d41c7 100644 --- a/rpc/dataconn/timeoutconn/timeoutconn.go +++ b/rpc/dataconn/timeoutconn/timeoutconn.go @@ -95,7 +95,7 @@ restart: return n, err } var nCurRead int - nCurRead, err = c.Wire.Read(p[n:len(p)]) + nCurRead, err = c.Wire.Read(p[n:]) n += nCurRead if netErr, ok := err.(net.Error); ok && netErr.Timeout() && nCurRead > 0 { err = nil @@ -111,7 +111,7 @@ restart: return n, err } var nCurWrite int - nCurWrite, err = c.Wire.Write(p[n:len(p)]) + nCurWrite, err = c.Wire.Write(p[n:]) n += nCurWrite if netErr, ok := err.(net.Error); ok && netErr.Timeout() && nCurWrite > 0 { err = nil diff --git a/rpc/dataconn/timeoutconn/timeoutconn_test.go b/rpc/dataconn/timeoutconn/timeoutconn_test.go index c78896e..b4cce04 100644 --- a/rpc/dataconn/timeoutconn/timeoutconn_test.go +++ b/rpc/dataconn/timeoutconn/timeoutconn_test.go @@ -102,7 +102,7 @@ func TestNoPartialReadsDueToDeadline(t *testing.T) { // io.Copy will encounter a partial read, then wait ~50ms until the other 5 bytes are written // It is still going to fail with deadline err because it expects EOF n, err := io.Copy(&buf, bc) - readDuration := time.Now().Sub(beginRead) + readDuration := time.Since(beginRead) t.Logf("read duration=%s", readDuration) t.Logf("recv done n=%v err=%v", n, err) t.Logf("buf=%v", buf.Bytes()) @@ -153,7 +153,7 @@ func TestPartialWriteMockConn(t *testing.T) { buf := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} begin := time.Now() n, err := mc.Write(buf[:]) - duration := time.Now().Sub(begin) + duration := time.Since(begin) assert.NoError(t, err) assert.Equal(t, 5, n) assert.True(t, duration > 100*time.Millisecond) diff --git a/rpc/grpcclientidentity/authlistener_grpc_adaptor.go b/rpc/grpcclientidentity/authlistener_grpc_adaptor.go index 37ec12a..267516c 100644 --- a/rpc/grpcclientidentity/authlistener_grpc_adaptor.go +++ b/rpc/grpcclientidentity/authlistener_grpc_adaptor.go @@ -106,7 +106,7 @@ func NewInterceptors(logger Logger, clientIdentityKey interface{}) (unary grpc.U if !ok { panic("peer.FromContext expected to return a peer in grpc.UnaryServerInterceptor") } - logger.WithField("peer_addr", fmt.Sprintf("%s", p.Addr)).Debug("peer addr") + logger.WithField("peer_addr", p.Addr.String()).Debug("peer addr") a, ok := p.AuthInfo.(*authConnAuthType) if !ok { panic(fmt.Sprintf("NewInterceptors must be used in combination with grpc.NewTransportCredentials, but got auth type %T", p.AuthInfo)) diff --git a/rpc/grpcclientidentity/example/main.go b/rpc/grpcclientidentity/example/main.go index 3813683..e69fa6b 100644 --- a/rpc/grpcclientidentity/example/main.go +++ b/rpc/grpcclientidentity/example/main.go @@ -88,6 +88,9 @@ func server() { log := logger.NewStderrDebugLogger() srv, serve, err := grpchelper.NewServer(authListenerFactory, clientIdentityKey, log) + if err != nil { + onErr(err, "new server") + } svc := &greeter{"hello "} pdu.RegisterGreeterServer(srv, svc) diff --git a/rpc/grpcclientidentity/example/pdu/grpcauth.pb.go b/rpc/grpcclientidentity/example/pdu/grpcauth.pb.go index 35686d8..0fbf2cb 100644 --- a/rpc/grpcclientidentity/example/pdu/grpcauth.pb.go +++ b/rpc/grpcclientidentity/example/pdu/grpcauth.pb.go @@ -5,11 +5,10 @@ package pdu import ( fmt "fmt" - math "math" - proto "github.com/golang/protobuf/proto" context "golang.org/x/net/context" grpc "google.golang.org/grpc" + math "math" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/rpc/rpc_debug.go b/rpc/rpc_debug.go index a31e1f2..66f3da3 100644 --- a/rpc/rpc_debug.go +++ b/rpc/rpc_debug.go @@ -13,6 +13,7 @@ func init() { } } +//nolint[:deadcode,unused] func debug(format string, args ...interface{}) { if debugEnabled { fmt.Fprintf(os.Stderr, "rpc: %s\n", fmt.Sprintf(format, args...)) diff --git a/rpc/rpc_logging.go b/rpc/rpc_logging.go index e842f6e..3d52690 100644 --- a/rpc/rpc_logging.go +++ b/rpc/rpc_logging.go @@ -12,9 +12,6 @@ type contextKey int const ( contextKeyLoggers contextKey = iota - contextKeyGeneralLogger - contextKeyControlLogger - contextKeyDataLogger ) /// All fields must be non-nil diff --git a/rpc/rpc_server.go b/rpc/rpc_server.go index f0f0f6b..880c75b 100644 --- a/rpc/rpc_server.go +++ b/rpc/rpc_server.go @@ -34,8 +34,6 @@ type Server struct { dataServerServe serveFunc } -type serverContextKey int - type HandlerContextInterceptor func(ctx context.Context) context.Context // config must be valid (use its Validate function). diff --git a/rpc/transportmux/transportmux.go b/rpc/transportmux/transportmux.go index ad34eed..90ec48c 100644 --- a/rpc/transportmux/transportmux.go +++ b/rpc/transportmux/transportmux.go @@ -7,6 +7,7 @@ package transportmux import ( "context" + "fmt" "io" "net" @@ -142,7 +143,10 @@ func Demux(ctx context.Context, rawListener transport.AuthenticatedListener, lab continue } - rawConn.SetDeadline(time.Time{}) + err = rawConn.SetDeadline(time.Time{}) + if err != nil { + getLog(ctx).WithError(err).Error("cannot reset deadline") + } // blocking is intentional demuxListener.conns <- acceptRes{conn: rawConn, err: nil} } @@ -169,7 +173,12 @@ func (c labeledConnecter) Connect(ctx context.Context) (transport.Wire, error) { } if dl, ok := ctx.Deadline(); ok { - defer conn.SetDeadline(time.Time{}) + defer func() { + err := conn.SetDeadline(time.Time{}) + if err != nil { + getLog(ctx).WithError(err).Error("cannot reset deadline") + } + }() if err := conn.SetDeadline(dl); err != nil { closeConn(err) return nil, err diff --git a/rpc/versionhandshake/versionhandshake.go b/rpc/versionhandshake/versionhandshake.go index 952acec..01f07e3 100644 --- a/rpc/versionhandshake/versionhandshake.go +++ b/rpc/versionhandshake/versionhandshake.go @@ -157,7 +157,7 @@ func DoHandshakeCurrentVersion(conn net.Conn, deadline time.Time) *HandshakeErro const HandshakeMessageMaxLen = 16 * 4096 -func DoHandshakeVersion(conn net.Conn, deadline time.Time, version int) *HandshakeError { +func DoHandshakeVersion(conn net.Conn, deadline time.Time, version int) (rErr *HandshakeError) { ours := HandshakeMessage{ ProtocolVersion: version, Extensions: nil, @@ -167,8 +167,19 @@ func DoHandshakeVersion(conn net.Conn, deadline time.Time, version int) *Handsha return hsErr("could not encode protocol banner: %s", err) } - defer conn.SetDeadline(time.Time{}) - conn.SetDeadline(deadline) + err = conn.SetDeadline(deadline) + if err != nil { + return hsErr("could not set deadline for protocol banner handshake: %s", err) + } + defer func() { + if rErr != nil { + return + } + err := conn.SetDeadline(time.Time{}) + if err != nil { + rErr = hsErr("could not reset deadline after protocol banner handshake: %s", err) + } + }() _, err = io.Copy(conn, bytes.NewBuffer(hsb)) if err != nil { return hsErr("could not send protocol banner: %s", err) diff --git a/tlsconf/tlsconf.go b/tlsconf/tlsconf.go index b1cb554..7c8241b 100644 --- a/tlsconf/tlsconf.go +++ b/tlsconf/tlsconf.go @@ -80,7 +80,9 @@ func (l *ClientAuthListener) Accept() (tcpConn *net.TCPConn, tlsConn *tls.Conn, if err = tlsConn.Handshake(); err != nil { goto CloseAndErr } - tlsConn.SetDeadline(time.Time{}) + if err = tlsConn.SetDeadline(time.Time{}); err != nil { + goto CloseAndErr + } peerCerts = tlsConn.ConnectionState().PeerCertificates if len(peerCerts) < 1 { diff --git a/transport/tls/serve_tls.go b/transport/tls/serve_tls.go index c598b61..8b8360b 100644 --- a/transport/tls/serve_tls.go +++ b/transport/tls/serve_tls.go @@ -3,7 +3,6 @@ package tls import ( "context" "crypto/tls" - "crypto/x509" "fmt" "net" "time" @@ -15,13 +14,7 @@ import ( "github.com/zrepl/zrepl/transport" ) -type TLSListenerFactory struct { - address string - clientCA *x509.CertPool - serverCert tls.Certificate - handshakeTimeout time.Duration - clientCNs map[string]struct{} -} +type TLSListenerFactory struct{} func TLSListenerFactoryFromConfig(c *config.Global, in *config.TLSServe) (transport.AuthenticatedListenerFactory, error) { @@ -75,12 +68,21 @@ func (l tlsAuthListener) Accept(ctx context.Context) (*transport.AuthConn, error return nil, err } if _, ok := l.clientCNs[cn]; !ok { + log := transport.GetLogger(ctx) if dl, ok := ctx.Deadline(); ok { - defer tlsConn.SetDeadline(time.Time{}) - tlsConn.SetDeadline(dl) + defer func() { + err := tlsConn.SetDeadline(time.Time{}) + if err != nil { + log.WithError(err).Error("cannot clear connection deadline") + } + }() + err := tlsConn.SetDeadline(dl) + if err != nil { + log.WithError(err).WithField("deadline", dl).Error("cannot set connection deadline inherited from context") + } } if err := tlsConn.Close(); err != nil { - transport.GetLogger(ctx).WithError(err).Error("error closing connection with unauthorized common name") + log.WithError(err).Error("error closing connection with unauthorized common name") } return nil, fmt.Errorf("unauthorized client common name %q from %s", cn, tlsConn.RemoteAddr()) } diff --git a/util/bytecounter/bytecounter_reader.go b/util/bytecounter/bytecounter_reader.go new file mode 100644 index 0000000..fc1f7c5 --- /dev/null +++ b/util/bytecounter/bytecounter_reader.go @@ -0,0 +1,49 @@ +package bytecounter + +import ( + "io" + "sync/atomic" + "time" +) + +type ByteCounterReader struct { + reader io.ReadCloser + + // called & accessed synchronously during Read, no external access + cb func(full int64) + cbEvery time.Duration + lastCbAt time.Time + + // set atomically because it may be read by multiple threads + bytes int64 +} + +func NewByteCounterReader(reader io.ReadCloser) *ByteCounterReader { + return &ByteCounterReader{ + reader: reader, + } +} + +func (b *ByteCounterReader) SetCallback(every time.Duration, cb func(full int64)) { + b.cbEvery = every + b.cb = cb +} + +func (b *ByteCounterReader) Close() error { + return b.reader.Close() +} + +func (b *ByteCounterReader) Read(p []byte) (n int, err error) { + n, err = b.reader.Read(p) + full := atomic.AddInt64(&b.bytes, int64(n)) + now := time.Now() + if b.cb != nil && now.Sub(b.lastCbAt) > b.cbEvery { + b.cb(full) + b.lastCbAt = now + } + return n, err +} + +func (b *ByteCounterReader) Bytes() int64 { + return atomic.LoadInt64(&b.bytes) +} diff --git a/util/chainedio/chainedio_reader.go b/util/chainedio/chainedio_reader.go new file mode 100644 index 0000000..a7bdea6 --- /dev/null +++ b/util/chainedio/chainedio_reader.go @@ -0,0 +1,34 @@ +package chainedio + +import "io" + +type ChainedReader struct { + Readers []io.Reader + curReader int +} + +func NewChainedReader(reader ...io.Reader) *ChainedReader { + return &ChainedReader{ + Readers: reader, + curReader: 0, + } +} + +func (c *ChainedReader) Read(buf []byte) (n int, err error) { + + n = 0 + + for c.curReader < len(c.Readers) { + n, err = c.Readers[c.curReader].Read(buf) + if err == io.EOF { + c.curReader++ + continue + } + break + } + if c.curReader == len(c.Readers) { + err = io.EOF // actually, there was no gap + } + + return +} diff --git a/util/chunking.go b/util/chunking/chunking.go similarity index 98% rename from util/chunking.go rename to util/chunking/chunking.go index bfbb59d..cceb344 100644 --- a/util/chunking.go +++ b/util/chunking/chunking.go @@ -1,4 +1,4 @@ -package util +package chunking import ( "bytes" @@ -49,7 +49,7 @@ func (c *Unchunker) Read(b []byte) (n int, err error) { } - if c.remainingChunkBytes <= 0 { + if c.remainingChunkBytes == 0 { panic("internal inconsistency: c.remainingChunkBytes must be > 0") } if len(b) <= 0 { diff --git a/util/chunking_test.go b/util/chunking/chunking_test.go similarity index 98% rename from util/chunking_test.go rename to util/chunking/chunking_test.go index c0641ad..7514089 100644 --- a/util/chunking_test.go +++ b/util/chunking/chunking_test.go @@ -1,4 +1,4 @@ -package util +package chunking import ( "bytes" diff --git a/util/connlogger/connlogger.go b/util/connlogger/connlogger.go new file mode 100644 index 0000000..f355e5b --- /dev/null +++ b/util/connlogger/connlogger.go @@ -0,0 +1,67 @@ +package connlogger + +import ( + "net" + "os" +) + +type NetConnLogger struct { + net.Conn + ReadFile *os.File + WriteFile *os.File +} + +func NewNetConnLogger(conn net.Conn, readlog, writelog string) (l *NetConnLogger, err error) { + l = &NetConnLogger{ + Conn: conn, + } + flags := os.O_CREATE | os.O_WRONLY + if readlog != "" { + if l.ReadFile, err = os.OpenFile(readlog, flags, 0600); err != nil { + return + } + } + if writelog != "" { + if l.WriteFile, err = os.OpenFile(writelog, flags, 0600); err != nil { + return + } + } + return +} + +func (c *NetConnLogger) Read(buf []byte) (n int, err error) { + n, err = c.Conn.Read(buf) + if c.WriteFile != nil { + if _, writeErr := c.ReadFile.Write(buf[0:n]); writeErr != nil { + panic(writeErr) + } + } + return +} + +func (c *NetConnLogger) Write(buf []byte) (n int, err error) { + n, err = c.Conn.Write(buf) + if c.ReadFile != nil { + if _, writeErr := c.WriteFile.Write(buf[0:n]); writeErr != nil { + panic(writeErr) + } + } + return +} +func (c *NetConnLogger) Close() (err error) { + err = c.Conn.Close() + if err != nil { + return + } + if c.ReadFile != nil { + if err := c.ReadFile.Close(); err != nil { + panic(err) + } + } + if c.WriteFile != nil { + if err := c.WriteFile.Close(); err != nil { + panic(err) + } + } + return +} diff --git a/util/io.go b/util/io.go deleted file mode 100644 index 32e35ec..0000000 --- a/util/io.go +++ /dev/null @@ -1,144 +0,0 @@ -package util - -import ( - "io" - "net" - "os" - "sync/atomic" - "time" -) - -type NetConnLogger struct { - net.Conn - ReadFile *os.File - WriteFile *os.File -} - -func NewNetConnLogger(conn net.Conn, readlog, writelog string) (l *NetConnLogger, err error) { - l = &NetConnLogger{ - Conn: conn, - } - flags := os.O_CREATE | os.O_WRONLY - if readlog != "" { - if l.ReadFile, err = os.OpenFile(readlog, flags, 0600); err != nil { - return - } - } - if writelog != "" { - if l.WriteFile, err = os.OpenFile(writelog, flags, 0600); err != nil { - return - } - } - return -} - -func (c *NetConnLogger) Read(buf []byte) (n int, err error) { - n, err = c.Conn.Read(buf) - if c.WriteFile != nil { - if _, writeErr := c.ReadFile.Write(buf[0:n]); writeErr != nil { - panic(writeErr) - } - } - return -} - -func (c *NetConnLogger) Write(buf []byte) (n int, err error) { - n, err = c.Conn.Write(buf) - if c.ReadFile != nil { - if _, writeErr := c.WriteFile.Write(buf[0:n]); writeErr != nil { - panic(writeErr) - } - } - return -} -func (c *NetConnLogger) Close() (err error) { - err = c.Conn.Close() - if err != nil { - return - } - if c.ReadFile != nil { - if err := c.ReadFile.Close(); err != nil { - panic(err) - } - } - if c.WriteFile != nil { - if err := c.WriteFile.Close(); err != nil { - panic(err) - } - } - return -} - -type ChainedReader struct { - Readers []io.Reader - curReader int -} - -func NewChainedReader(reader ...io.Reader) *ChainedReader { - return &ChainedReader{ - Readers: reader, - curReader: 0, - } -} - -func (c *ChainedReader) Read(buf []byte) (n int, err error) { - - n = 0 - - for c.curReader < len(c.Readers) { - n, err = c.Readers[c.curReader].Read(buf) - if err == io.EOF { - c.curReader++ - continue - } - break - } - if c.curReader == len(c.Readers) { - err = io.EOF // actually, there was no gap - } - - return -} - -type ByteCounterReader struct { - reader io.ReadCloser - - // called & accessed synchronously during Read, no external access - cb func(full int64) - cbEvery time.Duration - lastCbAt time.Time - bytesSinceLastCb int64 - - // set atomically because it may be read by multiple threads - bytes int64 -} - -func NewByteCounterReader(reader io.ReadCloser) *ByteCounterReader { - return &ByteCounterReader{ - reader: reader, - } -} - -func (b *ByteCounterReader) SetCallback(every time.Duration, cb func(full int64)) { - b.cbEvery = every - b.cb = cb -} - -func (b *ByteCounterReader) Close() error { - return b.reader.Close() -} - -func (b *ByteCounterReader) Read(p []byte) (n int, err error) { - n, err = b.reader.Read(p) - full := atomic.AddInt64(&b.bytes, int64(n)) - now := time.Now() - if b.cb != nil && now.Sub(b.lastCbAt) > b.cbEvery { - b.cb(full) - b.lastCbAt = now - } - return n, err -} - -func (b *ByteCounterReader) Bytes() int64 { - return atomic.LoadInt64(&b.bytes) -} diff --git a/util/iocommand.go b/util/iocommand/iocommand.go similarity index 98% rename from util/iocommand.go rename to util/iocommand/iocommand.go index 31ff8cb..fa9887e 100644 --- a/util/iocommand.go +++ b/util/iocommand/iocommand.go @@ -1,4 +1,4 @@ -package util +package iocommand import ( "bytes" @@ -99,7 +99,7 @@ func (c *IOCommand) doWait(ctx context.Context) (err error) { if !ok { return } - time.Sleep(dl.Sub(time.Now())) + time.Sleep(time.Until(dl)) c.kill() c.Stdout.Close() c.Stdin.Close() diff --git a/util/contextflexibletimeout.go b/util/optionaldeadline/optionaldeadline.go similarity index 96% rename from util/contextflexibletimeout.go rename to util/optionaldeadline/optionaldeadline.go index 422318d..36f5045 100644 --- a/util/contextflexibletimeout.go +++ b/util/optionaldeadline/optionaldeadline.go @@ -1,4 +1,4 @@ -package util +package optionaldeadline import ( "context" @@ -54,7 +54,7 @@ func ContextWithOptionalDeadline(pctx context.Context) (ctx context.Context, enf } // Deadline in past? - sleepTime := deadline.Sub(time.Now()) + sleepTime := time.Until(deadline) if sleepTime <= 0 { rctx.m.Lock() rctx.err = context.DeadlineExceeded diff --git a/util/contextflexibletimeout_test.go b/util/optionaldeadline/optionaldeadline_test.go similarity index 65% rename from util/contextflexibletimeout_test.go rename to util/optionaldeadline/optionaldeadline_test.go index 4ff1dfd..299a476 100644 --- a/util/contextflexibletimeout_test.go +++ b/util/optionaldeadline/optionaldeadline_test.go @@ -1,4 +1,4 @@ -package util +package optionaldeadline import ( "context" @@ -7,6 +7,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/zrepl/zrepl/util/chainlock" ) func TestContextWithOptionalDeadline(t *testing.T) { @@ -15,19 +17,28 @@ func TestContextWithOptionalDeadline(t *testing.T) { cctx, enforceDeadline := ContextWithOptionalDeadline(ctx) begin := time.Now() - var receivedCancellation time.Time - var cancellationError error + var checker struct { + receivedCancellation time.Time + cancellationError error + timeout bool + mtx chainlock.L + } go func() { select { case <-cctx.Done(): - receivedCancellation = time.Now() - cancellationError = cctx.Err() + defer checker.mtx.Lock().Unlock() + checker.receivedCancellation = time.Now() + checker.cancellationError = cctx.Err() case <-time.After(600 * time.Millisecond): - t.Fatalf("should have been cancelled by deadline") + defer checker.mtx.Lock().Unlock() + checker.timeout = true } }() - time.Sleep(100 * time.Millisecond) - if !receivedCancellation.IsZero() { + defer checker.mtx.Lock().Unlock() + checker.mtx.DropWhile(func() { + time.Sleep(100 * time.Millisecond) + }) + if !checker.receivedCancellation.IsZero() { t.Fatalf("no enforcement means no cancellation") } require.Nil(t, cctx.Err(), "no error while not cancelled") @@ -38,11 +49,15 @@ func TestContextWithOptionalDeadline(t *testing.T) { // second call must be ignored, i.e. we expect the deadline to be at begin+200ms, not begin+400ms enforceDeadline(begin.Add(400 * time.Millisecond)) - time.Sleep(300 * time.Millisecond) // 100ms margin for scheduler - if receivedCancellation.Sub(begin) > 250*time.Millisecond { - t.Fatalf("cancellation is beyond acceptable scheduler latency") + checker.mtx.DropWhile(func() { + time.Sleep(300 * time.Millisecond) // 100ms margin for scheduler + }) + assert.False(t, checker.timeout, "test timeout") + receivedCancellationAfter := checker.receivedCancellation.Sub(begin) + if receivedCancellationAfter > 250*time.Millisecond { + t.Fatalf("cancellation is beyond acceptable scheduler latency: %s", receivedCancellationAfter) } - require.Equal(t, context.DeadlineExceeded, cancellationError) + require.Equal(t, context.DeadlineExceeded, checker.cancellationError) } func TestContextWithOptionalDeadlineNegativeDeadline(t *testing.T) { diff --git a/zfs/datasetpath_visitor.go b/zfs/datasetpath_visitor.go index b8c8e05..7facec9 100644 --- a/zfs/datasetpath_visitor.go +++ b/zfs/datasetpath_visitor.go @@ -108,8 +108,7 @@ func (t *datasetPathTree) WalkTopDown(parent []string, visitor DatasetPathsVisit func newDatasetPathTree(initialComps []string) (t *datasetPathTree) { t = &datasetPathTree{} - var cur *datasetPathTree - cur = t + cur := t for i, comp := range initialComps { cur.Component = comp cur.FilledIn = true diff --git a/zfs/replication_history.go b/zfs/replication_history.go index acc0a51..36663c3 100644 --- a/zfs/replication_history.go +++ b/zfs/replication_history.go @@ -31,6 +31,9 @@ func ZFSSetReplicationCursor(fs *DatasetPath, snapname string) (guid uint64, err return 0, errors.Wrap(err, "zfs: replication cursor: get snapshot createtxg") } snapGuid, err := strconv.ParseUint(propsSnap.Get("guid"), 10, 64) + if err != nil { + return 0, errors.Wrap(err, "zfs: replication cursor: parse snapshot guid") + } bookmarkPath := fmt.Sprintf("%s#%s", fs.ToString(), ReplicationCursorBookmarkName) propsBookmark, err := zfsGet(bookmarkPath, []string{"createtxg"}, sourceAny) _, bookmarkNotExistErr := err.(*DatasetDoesNotExist) diff --git a/zfs/versions.go b/zfs/versions.go index d211621..86850c0 100644 --- a/zfs/versions.go +++ b/zfs/versions.go @@ -3,13 +3,13 @@ package zfs import ( "bytes" "context" - "errors" "fmt" "io" "strconv" "strings" "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -17,7 +17,7 @@ type VersionType string const ( Bookmark VersionType = "bookmark" - Snapshot = "snapshot" + Snapshot VersionType = "snapshot" ) func (t VersionType) DelimiterChar() string { @@ -37,14 +37,14 @@ func (t VersionType) String() string { func DecomposeVersionString(v string) (fs string, versionType VersionType, name string, err error) { if len(v) < 3 { - err = errors.New(fmt.Sprintf("snapshot or bookmark name implausibly short: %s", v)) + err = fmt.Errorf("snapshot or bookmark name implausibly short: %s", v) return } snapSplit := strings.SplitN(v, "@", 2) bookmarkSplit := strings.SplitN(v, "#", 2) if len(snapSplit)*len(bookmarkSplit) != 2 { - err = errors.New(fmt.Sprintf("dataset cannot be snapshot and bookmark at the same time: %s", v)) + err = fmt.Errorf("dataset cannot be snapshot and bookmark at the same time: %s", v) return } @@ -122,12 +122,12 @@ func ZFSListFilesystemVersions(fs *DatasetPath, filter FilesystemVersionFilter) } if v.Guid, err = strconv.ParseUint(line[1], 10, 64); err != nil { - err = errors.New(fmt.Sprintf("cannot parse GUID: %s", err.Error())) + err = errors.Wrap(err, "cannot parse GUID") return } if v.CreateTXG, err = strconv.ParseUint(line[2], 10, 64); err != nil { - err = errors.New(fmt.Sprintf("cannot parse CreateTXG: %s", err.Error())) + err = errors.Wrap(err, "cannot parse CreateTXG") return } @@ -160,16 +160,9 @@ func ZFSDestroyFilesystemVersion(filesystem *DatasetPath, version *FilesystemVer datasetPath := version.ToAbsPath(filesystem) // Sanity check... - if strings.IndexAny(datasetPath, "@#") == -1 { - return fmt.Errorf("sanity check failed: no @ character found in dataset path: %s", datasetPath) + if !strings.ContainsAny(datasetPath, "@#") { + return fmt.Errorf("sanity check failed: no @ or # character found in %q", datasetPath) } - err = ZFSDestroy(datasetPath) - if err == nil { - return - } - - // Check for EBUSY, special meaning to us - return - + return ZFSDestroy(datasetPath) } diff --git a/zfs/zfs.go b/zfs/zfs.go index 257f509..fd40917 100644 --- a/zfs/zfs.go +++ b/zfs/zfs.go @@ -67,7 +67,6 @@ func (p *DatasetPath) TrimPrefix(prefix *DatasetPath) { for i := 0; i < newlen; i++ { p.comps[i] = oldcomps[prelen+i] } - return } func (p *DatasetPath) TrimNPrefixComps(n int) { @@ -251,7 +250,9 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin return } defer func() { - cmd.Wait() + // discard the error, this defer is only relevant if we return while parsing the output + // in which case we'll return an 'unexpected output' error and not the exit status + _ = cmd.Wait() }() s := bufio.NewScanner(stdout) @@ -283,7 +284,6 @@ func ZFSListChan(ctx context.Context, out chan ZFSListResult, properties []strin sendResult(nil, s.Err()) return } - return } func validateRelativeZFSVersion(s string) error { @@ -731,7 +731,7 @@ func ZFSRecv(ctx context.Context, fs string, streamCopier StreamCopier, opts Rec { vs, err := ZFSListFilesystemVersions(fsdp, nil) if err != nil { - err = fmt.Errorf("cannot list versions to rollback is required: %s", err) + return fmt.Errorf("cannot list versions for rollback for forced receive: %s", err) } for _, v := range vs { if v.Type == Snapshot { diff --git a/zfs/zfs_debug.go b/zfs/zfs_debug.go index 32846e4..575bb31 100644 --- a/zfs/zfs_debug.go +++ b/zfs/zfs_debug.go @@ -13,6 +13,7 @@ func init() { } } +//nolint[:deadcode,unused] func debug(format string, args ...interface{}) { if debugEnabled { fmt.Fprintf(os.Stderr, "zfs: %s\n", fmt.Sprintf(format, args...))