mirror of
synced 2025-02-18 03:11:55 +01:00
Go 1.18 deprecated net.Error.Temporary(). This commit cleans up places where we use it incorrectly. Also, the rpc layer defines some errors that implement interface { Temporary() bool } I added comments to all of the implementations to indicate whether they will be required if net.Error.Temporary is ever ever removed in the future. For HandshakeError, the Temporary() return value is actually important. I moved & rewrote a (previously misplaced) comment there. The ReadStreamError changes were 1. necessary to pacify newer staticcheck and 2. technically, an error can implement Temporary() without being net.Err. This applies to some syscall errors in the standard library. Reading list for those interested: - https://github.com/golang/go/issues/45729 - https://groups.google.com/g/golang-nuts/c/-JcZzOkyqYI - https://man7.org/linux/man-pages/man2/accept.2.html Note: This change was prompted by staticheck: > SA1019: neterr.Temporary has been deprecated since Go 1.18 because it > shouldn't be used: Temporary errors are not well-defined. Most > "temporary" errors are timeouts, and the few exceptions are surprising. > Do not use this method. (staticcheck)
842 lines
24 KiB
842 lines
24 KiB
package driver
import (
type interval struct {
begin time.Time
end time.Time
func (w *interval) SetZero() {
w.begin = time.Time{}
w.end = time.Time{}
// Duration of 0 means indefinite length
func (w *interval) Set(begin time.Time, duration time.Duration) {
if begin.IsZero() {
panic("zero begin time now allowed")
w.begin = begin
w.end = begin.Add(duration)
// Returns the End of the interval if it has a defined length.
// For indefinite lengths, returns the zero value.
func (w *interval) End() time.Time {
return w.end
// Return a context with a deadline at the interval's end.
// If the interval has indefinite length (duration 0 on Set), return ctx as is.
// The returned context.CancelFunc can be called either way.
func (w *interval) ContextWithDeadlineAtEnd(ctx context.Context) (context.Context, context.CancelFunc) {
if w.begin.IsZero() {
panic("must call Set before ContextWIthDeadlineAtEnd")
if w.end.IsZero() {
// indefinite length, just return context as is
return ctx, func() {}
} else {
return context.WithDeadline(ctx, w.end)
type run struct {
l *chainlock.L
startedAt, finishedAt time.Time
waitReconnect interval
waitReconnectError *timedError
// the attempts attempted so far:
// All but the last in this slice must have finished with some errors.
// The last attempt may not be finished and may not have errors.
attempts []*attempt
type Planner interface {
Plan(context.Context) ([]FS, error)
WaitForConnectivity(context.Context) error
// an attempt represents a single planning & execution of fs replications
type attempt struct {
planner Planner
config Config
l *chainlock.L
startedAt, finishedAt time.Time
// after Planner.Plan was called, planErr and fss are mutually exclusive with regards to nil-ness
// if both are nil, it must be assumed that Planner.Plan is active
planErr *timedError
fss []*fs
type timedError struct {
Err error
Time time.Time
func newTimedError(err error, t time.Time) *timedError {
if err == nil {
panic("error must be non-nil")
if t.IsZero() {
panic("t must be non-zero")
return &timedError{err, t}
func (e *timedError) IntoReportError() *report.TimedError {
if e == nil {
return nil
return report.NewTimedError(e.Err.Error(), e.Time)
type FS interface {
// Returns true if this FS and fs refer to the same filesystem returned
// by Planner.Plan in a previous attempt.
EqualToPreviousAttempt(fs FS) bool
// The returned steps are assumed to be dependent on exactly
// their direct predecessors in the returned list.
PlanFS(context.Context) ([]Step, error)
ReportInfo() *report.FilesystemInfo
type Step interface {
// Returns true iff the target snapshot is the same for this Step and other.
// We do not use TargetDate to avoid problems with wrong system time on
// snapshot creation.
// Implementations can assume that `other` is a step of the same filesystem,
// although maybe from a previous attempt.
// (`same` as defined by FS.EqualToPreviousAttempt)
// Note that TargetEquals should return true in a situation with one
// originally sent snapshot and a subsequent attempt's step that uses
// resumable send & recv.
TargetEquals(other Step) bool
TargetDate() time.Time
Step(context.Context) error
ReportInfo() *report.StepInfo
type fs struct {
fs FS
l *chainlock.L
blockedOn report.FsBlockedOn
// ordering relationship that must be maintained for initial replication
initialRepOrd struct {
parents, children []*fs
parentDidUpdate chan struct{}
planning struct {
waitingForStepQueue bool
done bool
err *timedError
// valid iff planning.done && planning.err == nil
planned struct {
// valid iff planning.done && planning.err == nil
stepErr *timedError
// all steps, in the order in which they must be completed
steps []*step
// index into steps, pointing at the step that is currently executing
// if step >= len(steps), no more work needs to be done
step int
type step struct {
l *chainlock.L
step Step
type ReportFunc func() *report.Report
type WaitFunc func(block bool) (done bool)
type Config struct {
StepQueueConcurrency int `validate:"gte=1"`
MaxAttempts int `validate:"eq=-1|gt=0"`
ReconnectHardFailTimeout time.Duration `validate:"gt=0"`
var validate = validator.New()
func (c Config) Validate() error {
return validate.Struct(c)
// caller must ensure config.Validate() == nil
func Do(ctx context.Context, config Config, planner Planner) (ReportFunc, WaitFunc) {
if err := config.Validate(); err != nil {
log := getLog(ctx)
l := chainlock.New()
run := &run{
l: l,
startedAt: time.Now(),
done := make(chan struct{})
go func() {
defer close(done)
defer run.l.Lock().Unlock()
log.Debug("begin run")
defer log.Debug("run ended")
var prev *attempt
mainLog := log
for ano := 0; ano < int(config.MaxAttempts); ano++ {
log := mainLog.WithField("attempt_number", ano)
log.Debug("start attempt")
run.waitReconnectError = nil
// do current attempt
cur := &attempt{
l: l,
startedAt: time.Now(),
planner: planner,
config: config,
run.attempts = append(run.attempts, cur)
run.l.DropWhile(func() {
cur.do(ctx, prev)
prev = cur
if ctx.Err() != nil {
log.WithError(ctx.Err()).Info("context error")
// error classification, bail out if done / permanent error
rep := cur.report()
log.WithField("attempt_state", rep.State).Debug("attempt state")
errRep := cur.errorReport()
if rep.State == report.AttemptDone {
if len(rep.Filesystems) == 0 {
log.Warn("no filesystems were considered for replication")
log.Debug("attempt completed")
mostRecentErr, mostRecentErrClass := errRep.MostRecent()
log.WithField("most_recent_err", mostRecentErr).WithField("most_recent_err_class", mostRecentErrClass).Debug("most recent error used for re-connect decision")
if mostRecentErr == nil {
// inconsistent reporting, let's bail out
log.WithField("attempt_state", rep.State).Warn("attempt does not report done but error report does not report errors, aborting run")
log.WithError(mostRecentErr.Err).Error("most recent error in this attempt")
shouldReconnect := mostRecentErrClass == errorClassTemporaryConnectivityRelated
log.WithField("reconnect_decision", shouldReconnect).Debug("reconnect decision made")
if shouldReconnect {
run.waitReconnect.Set(time.Now(), config.ReconnectHardFailTimeout)
log.WithField("deadline", run.waitReconnect.End()).Error("temporary connectivity-related error identified, start waiting for reconnect")
var connectErr error
var connectErrTime time.Time
run.l.DropWhile(func() {
ctx, cancel := run.waitReconnect.ContextWithDeadlineAtEnd(ctx)
defer cancel()
connectErr = planner.WaitForConnectivity(ctx)
connectErrTime = time.Now()
if connectErr == nil {
log.Error("reconnect successful") // same level as 'begin with reconnect' message above
} else {
run.waitReconnectError = newTimedError(connectErr, connectErrTime)
log.WithError(connectErr).Error("reconnecting failed, aborting run")
} else {
log.Error("most recent error cannot be solved by reconnecting, aborting run")
wait := func(block bool) bool {
if block {
select {
case <-done:
return true
return false
report := func() *report.Report {
defer run.l.Lock().Unlock()
return run.report()
return report, wait
func (a *attempt) do(ctx context.Context, prev *attempt) {
prevs := a.doGlobalPlanning(ctx, prev)
if prevs == nil {
a.doFilesystems(ctx, prevs)
// if no error occurs, returns a map that maps this attempt's a.fss to `prev`'s a.fss
func (a *attempt) doGlobalPlanning(ctx context.Context, prev *attempt) map[*fs]*fs {
ctx, endSpan := trace.WithSpan(ctx, "plan")
defer endSpan()
pfss, err := a.planner.Plan(ctx)
errTime := time.Now()
defer a.l.Lock().Unlock()
if err != nil {
a.planErr = newTimedError(err, errTime)
a.fss = nil
a.finishedAt = time.Now()
return nil
// a.fss != nil indicates that there was no planning error (see doc comment)
a.fss = make([]*fs, 0)
for _, pfs := range pfss {
fs := &fs{
fs: pfs,
l: a.l,
blockedOn: report.FsBlockedOnNothing,
fs.initialRepOrd.parentDidUpdate = make(chan struct{}, 1)
a.fss = append(a.fss, fs)
prevs := make(map[*fs]*fs)
prevFSs := make(map[*fs][]*fs, len(pfss))
if prev != nil {
debug("previous attempt has %d fss", len(a.fss))
for _, fs := range a.fss {
for _, prevFS := range prev.fss {
if fs.fs.EqualToPreviousAttempt(prevFS.fs) {
l := prevFSs[fs]
l = append(l, prevFS)
prevFSs[fs] = l
type inconsistency struct {
cur *fs
prevs []*fs
var inconsistencies []inconsistency
for cur, fss := range prevFSs {
if len(fss) > 1 {
inconsistencies = append(inconsistencies, inconsistency{cur, fss})
sort.SliceStable(inconsistencies, func(i, j int) bool {
return inconsistencies[i].cur.fs.ReportInfo().Name < inconsistencies[j].cur.fs.ReportInfo().Name
if len(inconsistencies) > 0 {
var msg strings.Builder
msg.WriteString("cannot determine filesystem correspondences between different attempts:\n")
var inconsistencyLines []string
for _, i := range inconsistencies {
var prevNames []string
for _, prev := range i.prevs {
prevNames = append(prevNames, prev.fs.ReportInfo().Name)
l := fmt.Sprintf(" %s => %v", i.cur.fs.ReportInfo().Name, prevNames)
inconsistencyLines = append(inconsistencyLines, l)
fmt.Fprint(&msg, strings.Join(inconsistencyLines, "\n"))
now := time.Now()
a.planErr = newTimedError(errors.New(msg.String()), now)
a.fss = nil
a.finishedAt = now
return nil
for cur, fss := range prevFSs {
if len(fss) > 0 {
prevs[cur] = fss[0]
// invariant: prevs contains an entry for each unambiguous correspondence
// build up parent-child relationship (FIXME (O(n^2), but who's going to have that many filesystems...))
mustDatasetPathOrPlanFail := func(fs string) *zfs.DatasetPath {
dp, err := zfs.NewDatasetPath(fs)
if err != nil {
now := time.Now()
a.planErr = newTimedError(errors.Wrapf(err, "%q", fs), now)
a.fss = nil
a.finishedAt = now
return nil
return dp
for _, f1 := range a.fss {
fs1 := mustDatasetPathOrPlanFail(f1.fs.ReportInfo().Name)
if fs1 == nil {
return nil
for _, f2 := range a.fss {
fs2 := mustDatasetPathOrPlanFail(f2.fs.ReportInfo().Name)
if fs2 == nil {
return nil
if fs1.HasPrefix(fs2) && !fs1.Equal(fs2) {
f1.initialRepOrd.parents = append(f1.initialRepOrd.parents, f2)
f2.initialRepOrd.children = append(f2.initialRepOrd.children, f1)
return prevs
func (a *attempt) doFilesystems(ctx context.Context, prevs map[*fs]*fs) {
ctx, endSpan := trace.WithSpan(ctx, "do-repl")
defer endSpan()
defer a.l.Lock().Unlock()
stepQueue := newStepQueue()
defer stepQueue.Start(a.config.StepQueueConcurrency)()
var fssesDone sync.WaitGroup
for _, f := range a.fss {
go func(f *fs) {
defer fssesDone.Done()
// avoid explosion of tasks with name f.report().Info.Name
ctx, endTask := trace.WithTaskAndSpan(ctx, "repl-fs", f.report().Info.Name)
defer endTask()
f.do(ctx, stepQueue, prevs[f])
f.l.HoldWhile(func() {
// every return from f means it's unblocked...
f.blockedOn = report.FsBlockedOnNothing
a.l.DropWhile(func() {
a.finishedAt = time.Now()
func (f *fs) debug(format string, args ...interface{}) {
debugPrefix("fs=%s", f.fs.ReportInfo().Name)(format, args...)
// wake up children that watch for f.{planning.{err,done},planned.{step,stepErr}}
func (f *fs) initialRepOrdWakeupChildren() {
var children []string
for _, c := range f.initialRepOrd.children {
// no locking required, c.fs does not change
children = append(children, c.fs.ReportInfo().Name)
f.debug("wakeup children %s", children)
for _, child := range f.initialRepOrd.children {
select {
// no locking required, child.initialRepOrd does not change
case child.initialRepOrd.parentDidUpdate <- struct{}{}:
func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
defer f.l.Lock().Unlock()
defer f.initialRepOrdWakeupChildren()
// get planned steps from replication logic
var psteps []Step
var errTime time.Time
var err error
f.blockedOn = report.FsBlockedOnPlanningStepQueue
f.l.DropWhile(func() {
// TODO hacky
// choose target time that is earlier than any snapshot, so fs planning is always prioritized
targetDate := time.Unix(0, 0)
defer pq.WaitReady(ctx, f, targetDate)()
f.l.HoldWhile(func() {
// transition before we call PlanFS
f.blockedOn = report.FsBlockedOnNothing
psteps, err = f.fs.PlanFS(ctx) // no shadow
errTime = time.Now() // no shadow
if err != nil {
f.planning.err = newTimedError(err, errTime)
for _, pstep := range psteps {
step := &step{
l: f.l,
step: pstep,
f.planned.steps = append(f.planned.steps, step)
// we're not done planning yet, f.planned.steps might still be changed by next block
// => don't set f.planning.done just yet
f.debug("initial len(fs.planned.steps) = %d", len(f.planned.steps))
// for not-first attempts that succeeded in planning, only allow fs.planned.steps
// up to and including the originally planned target snapshot
if prev != nil && prev.planning.done && prev.planning.err == nil {
f.debug("attempting to correlate plan with previous attempt to find out what is left to do")
// find the highest of the previously uncompleted steps for which we can also find a step
// in our current plan
prevUncompleted := prev.planned.steps[prev.planned.step:]
var target struct{ prev, cur int }
target.prev = -1
target.cur = -1
for p := len(prevUncompleted) - 1; p >= 0; p-- {
for q := len(f.planned.steps) - 1; q >= 0; q-- {
if prevUncompleted[p].step.TargetEquals(f.planned.steps[q].step) {
target.prev = p
target.cur = q
break out
if target.prev == -1 || target.cur == -1 {
f.debug("no correlation possible between previous attempt and this attempt's plan")
f.planning.err = newTimedError(fmt.Errorf("cannot correlate previously failed attempt to current plan"), time.Now())
f.planned.steps = f.planned.steps[0:target.cur]
f.debug("found correlation, new steps are len(fs.planned.steps) = %d", len(f.planned.steps))
} else {
f.debug("previous attempt does not exist or did not finish planning, no correlation possible, taking this attempt's plan as is")
// now we are done planning (f.planned.steps won't change from now on)
f.planning.done = true
// wait for parents' initial replication
f.blockedOn = report.FsBlockedOnParentInitialRepl
var parents []string
for _, p := range f.initialRepOrd.parents {
parents = append(parents, p.fs.ReportInfo().Name)
f.debug("wait for parents %s", parents)
for {
var initialReplicatingParentsWithErrors []string
allParentsPresentOnReceiver := true
f.l.DropWhile(func() {
for _, p := range f.initialRepOrd.parents {
p.l.HoldWhile(func() {
// (get the preconditions that allow us to inspect p.planned)
parentHasPlanningDone := p.planning.done && p.planning.err == nil
if !parentHasPlanningDone {
// if the parent couldn't be planned, we cannot know whether it needs initial replication
// or incremental replication => be conservative and assume it was initial replication
allParentsPresentOnReceiver = false
if p.planning.err != nil {
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
// now allowed to inspect p.planned
// if there are no steps to be done, the filesystem must exist on the receiving side
// (otherwise we'd replicate it, and there would be a step for that)
// (FIXME hardcoded initial replication policy, assuming the policy will always do _some_ initial replication)
parentHasNoSteps := len(p.planned.steps) == 0
// OR if it has completed at least one step
// (remember that .step points to the next step to be done)
// (TODO technically, we could make this step ready in the moment the recv-side
// dataset exists, i.e. after the first few megabytes of transferred data, but we'd have to ask the receiver for that -> poll ListFilesystems RPC)
parentHasTakenAtLeastOneSuccessfulStep := !parentHasNoSteps && p.planned.step >= 1
parentFirstStepIsIncremental := // no need to lock for .report() because step.l == it's fs.l
len(p.planned.steps) > 0 && p.planned.steps[0].report().IsIncremental()
f.debug("parentHasNoSteps=%v parentFirstStepIsIncremental=%v parentHasTakenAtLeastOneSuccessfulStep=%v",
parentHasNoSteps, parentFirstStepIsIncremental, parentHasTakenAtLeastOneSuccessfulStep)
parentPresentOnReceiver := parentHasNoSteps || parentFirstStepIsIncremental || parentHasTakenAtLeastOneSuccessfulStep
allParentsPresentOnReceiver = allParentsPresentOnReceiver && parentPresentOnReceiver // no shadow
if !parentPresentOnReceiver && p.planned.stepErr != nil {
initialReplicatingParentsWithErrors = append(initialReplicatingParentsWithErrors, p.fs.ReportInfo().Name)
if len(initialReplicatingParentsWithErrors) > 0 {
f.planned.stepErr = newTimedError(fmt.Errorf("parent(s) failed during initial replication: %s", initialReplicatingParentsWithErrors), time.Now())
if allParentsPresentOnReceiver {
break // good to go
// wait for wakeups from parents, then check again
// lock must not be held while waiting in order for reporting to work
f.l.DropWhile(func() {
select {
case <-ctx.Done():
f.planned.stepErr = newTimedError(ctx.Err(), time.Now())
case <-f.initialRepOrd.parentDidUpdate:
// loop
if f.planned.stepErr != nil {
f.debug("all parents ready, start replication %s", parents)
// do our steps
for i, s := range f.planned.steps {
// lock must not be held while executing step in order for reporting to work
f.l.DropWhile(func() {
// wait for parallel replication
targetDate := s.step.TargetDate()
f.l.HoldWhile(func() { f.blockedOn = report.FsBlockedOnReplStepQueue })
defer pq.WaitReady(ctx, f, targetDate)()
f.l.HoldWhile(func() { f.blockedOn = report.FsBlockedOnNothing })
// do the step
ctx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("%#v", s.step.ReportInfo()))
defer endSpan()
err, errTime = s.step.Step(ctx), time.Now() // no shadow
if err != nil {
f.planned.stepErr = newTimedError(err, errTime)
f.planned.step = i + 1 // fs.planned.step must be == len(fs.planned.steps) if all went OK
// caller must hold lock l
func (r *run) report() *report.Report {
report := &report.Report{
Attempts: make([]*report.AttemptReport, len(r.attempts)),
StartAt: r.startedAt,
FinishAt: r.finishedAt,
WaitReconnectSince: r.waitReconnect.begin,
WaitReconnectUntil: r.waitReconnect.end,
WaitReconnectError: r.waitReconnectError.IntoReportError(),
for i := range report.Attempts {
report.Attempts[i] = r.attempts[i].report()
return report
// caller must hold lock l
func (a *attempt) report() *report.AttemptReport {
r := &report.AttemptReport{
// State is set below
Filesystems: make([]*report.FilesystemReport, len(a.fss)),
StartAt: a.startedAt,
FinishAt: a.finishedAt,
PlanError: a.planErr.IntoReportError(),
for i := range r.Filesystems {
r.Filesystems[i] = a.fss[i].report()
var state report.AttemptState
if a.planErr == nil && a.fss == nil {
state = report.AttemptPlanning
} else if a.planErr != nil && a.fss == nil {
state = report.AttemptPlanningError
} else if a.planErr == nil && a.fss != nil {
if a.finishedAt.IsZero() {
state = report.AttemptFanOutFSs
} else {
fsWithError := false
for _, s := range r.Filesystems {
fsWithError = fsWithError || s.Error() != nil
state = report.AttemptDone
if fsWithError {
state = report.AttemptFanOutError
} else {
panic(fmt.Sprintf("attempt.planErr and attempt.fss must not both be != nil:\n%s\n%s", pretty.Sprint(a.planErr), pretty.Sprint(a.fss)))
r.State = state
return r
// caller must hold lock l
func (f *fs) report() *report.FilesystemReport {
state := report.FilesystemPlanningErrored
if f.planning.err == nil {
if f.planning.done {
if f.planned.stepErr != nil {
state = report.FilesystemSteppingErrored
} else if f.planned.step < len(f.planned.steps) {
state = report.FilesystemStepping
} else {
state = report.FilesystemDone
} else {
state = report.FilesystemPlanning
r := &report.FilesystemReport{
Info: f.fs.ReportInfo(),
State: state,
BlockedOn: f.blockedOn,
PlanError: f.planning.err.IntoReportError(),
StepError: f.planned.stepErr.IntoReportError(),
Steps: make([]*report.StepReport, len(f.planned.steps)),
CurrentStep: f.planned.step,
for i := range r.Steps {
r.Steps[i] = f.planned.steps[i].report()
return r
// caller must hold lock l
func (s *step) report() *report.StepReport {
r := &report.StepReport{
Info: s.step.ReportInfo(),
return r
//go:generate enumer -type=errorClass
type errorClass int
const (
errorClassPermanent errorClass = iota
type errorReport struct {
flattened []*timedError
// sorted DESCending by err time
byClass map[errorClass][]*timedError
// caller must hold lock l
func (a *attempt) errorReport() *errorReport {
r := &errorReport{}
if a.planErr != nil {
r.flattened = append(r.flattened, a.planErr)
for _, fs := range a.fss {
if fs.planning.done && fs.planning.err != nil {
r.flattened = append(r.flattened, fs.planning.err)
} else if fs.planning.done && fs.planned.stepErr != nil {
r.flattened = append(r.flattened, fs.planned.stepErr)
// build byClass
r.byClass = make(map[errorClass][]*timedError)
putClass := func(err *timedError, class errorClass) {
errs := r.byClass[class]
errs = append(errs, err)
r.byClass[class] = errs
for _, err := range r.flattened {
if neterr, ok := err.Err.(net.Error); ok && neterr.Timeout() {
putClass(err, errorClassTemporaryConnectivityRelated)
if st, ok := status.FromError(err.Err); ok && st.Code() == codes.Unavailable {
// technically, codes.Unavailable could be returned by the gRPC endpoint, indicating overload, etc.
// for now, let's assume it only happens for connectivity issues, as specified in
// https://grpc.io/grpc/core/md_doc_statuscodes.html
putClass(err, errorClassTemporaryConnectivityRelated)
putClass(err, errorClassPermanent)
for _, errs := range r.byClass {
sort.Slice(errs, func(i, j int) bool {
return errs[i].Time.After(errs[j].Time) // sort descendingly
return r
func (r *errorReport) AnyError() *timedError {
for _, err := range r.flattened {
if err != nil {
return err
return nil
func (r *errorReport) MostRecent() (err *timedError, errClass errorClass) {
for class, errs := range r.byClass {
// errs are sorted descendingly during construction
if len(errs) > 0 && (err == nil || errs[0].Time.After(err.Time)) {
err = errs[0]
errClass = class