zrepl/cmd/replication/internal/fsfsm/fsfsm.go

362 lines
7.7 KiB
Go
Raw Normal View History

package fsfsm
import (
"context"
"errors"
"fmt"
"io"
"math/bits"
"net"
"sync"
"time"
"github.com/zrepl/zrepl/cmd/replication/pdu"
. "github.com/zrepl/zrepl/cmd/replication/common"
)
type StepReport struct {
From, To string
Status string
Problem string
}
type FilesystemReplicationReport struct {
Filesystem string
Status string
Problem string
Completed,Pending []*StepReport
}
//go:generate stringer -type=FSReplicationState
type FSReplicationState uint
const (
FSReady FSReplicationState = 1 << iota
FSRetryWait
FSPermanentError
FSCompleted
)
func (s FSReplicationState) fsrsf() fsrsf {
idx := bits.TrailingZeros(uint(s))
if idx == bits.UintSize {
panic(s)
}
m := []fsrsf{
fsrsfReady,
fsrsfRetryWait,
nil,
nil,
}
return m[idx]
}
type FSReplication struct {
// lock protects all fields in this struct, but not the data behind pointers
lock sync.Mutex
state FSReplicationState
fs string
err error
retryWaitUntil time.Time
completed, pending []*FSReplicationStep
}
func (f *FSReplication) State() FSReplicationState {
f.lock.Lock()
defer f.lock.Unlock()
return f.state
}
type FSReplicationBuilder struct {
r *FSReplication
}
func BuildFSReplication(fs string) *FSReplicationBuilder {
return &FSReplicationBuilder{&FSReplication{fs: fs}}
}
func (b *FSReplicationBuilder) AddStep(from, to FilesystemVersion) *FSReplicationBuilder {
step := &FSReplicationStep{
state: StepReady,
fsrep: b.r,
from: from,
to: to,
}
b.r.pending = append(b.r.pending, step)
return b
}
func (b *FSReplicationBuilder) Done() (r *FSReplication) {
if len(b.r.pending) > 0 {
b.r.state = FSReady
} else {
b.r.state = FSCompleted
}
r = b.r
b.r = nil
return r
}
func NewFSReplicationWithPermanentError(fs string, err error) *FSReplication {
return &FSReplication{
state: FSPermanentError,
fs: fs,
err: err,
}
}
//go:generate stringer -type=FSReplicationStepState
type FSReplicationStepState uint
const (
StepReady FSReplicationStepState = 1 << iota
StepRetry
StepPermanentError
StepCompleted
)
type FilesystemVersion interface {
SnapshotTime() time.Time
RelName() string
}
type FSReplicationStep struct {
// only protects state, err
// from, to and fsrep are assumed to be immutable
lock sync.Mutex
state FSReplicationStepState
from, to FilesystemVersion
fsrep *FSReplication
// both retry and permanent error
err error
}
func (f *FSReplication) TakeStep(ctx context.Context, ep EndpointPair) (post FSReplicationState, nextStepDate time.Time) {
var u fsrUpdater = func(fu func(*FSReplication)) FSReplicationState {
f.lock.Lock()
defer f.lock.Unlock()
if fu != nil {
fu(f)
}
return f.state
}
var s fsrsf = u(nil).fsrsf()
pre := u(nil)
preTime := time.Now()
s = s(ctx, ep, u)
delta := time.Now().Sub(preTime)
post = u(func(f *FSReplication) {
if len(f.pending) == 0 {
return
}
nextStepDate = f.pending[0].to.SnapshotTime()
})
GetLogger(ctx).
WithField("fs", f.fs).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta).
Debug("fsr step taken")
return post, nextStepDate
}
type fsrUpdater func(func(fsr *FSReplication)) FSReplicationState
type fsrsf func(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf
func fsrsfReady(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf {
var current *FSReplicationStep
s := u(func(f *FSReplication) {
if len(f.pending) == 0 {
f.state = FSCompleted
return
}
current = f.pending[0]
})
if s != FSReady {
return s.fsrsf()
}
stepState := current.do(ctx, ep)
return u(func(f *FSReplication) {
switch stepState {
case StepCompleted:
f.completed = append(f.completed, current)
f.pending = f.pending[1:]
if len(f.pending) > 0 {
f.state = FSReady
} else {
f.state = FSCompleted
}
case StepRetry:
f.retryWaitUntil = time.Now().Add(10 * time.Second) // FIXME make configurable
f.state = FSRetryWait
case StepPermanentError:
f.state = FSPermanentError
f.err = errors.New("a replication step failed with a permanent error")
default:
panic(f)
}
}).fsrsf()
}
func fsrsfRetryWait(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf {
var sleepUntil time.Time
u(func(f *FSReplication) {
sleepUntil = f.retryWaitUntil
})
t := time.NewTimer(sleepUntil.Sub(time.Now()))
defer t.Stop()
select {
case <-ctx.Done():
return u(func(f *FSReplication) {
f.state = FSPermanentError
f.err = ctx.Err()
}).fsrsf()
case <-t.C:
}
return u(func(f *FSReplication) {
f.state = FSReady
}).fsrsf()
}
// access to fsr's members must be exclusive
func (fsr *FSReplication) Report() *FilesystemReplicationReport {
fsr.lock.Lock()
defer fsr.lock.Unlock()
rep := FilesystemReplicationReport{
Filesystem: fsr.fs,
Status: fsr.state.String(),
}
if fsr.state&FSPermanentError != 0 {
rep.Problem = fsr.err.Error()
return &rep
}
rep.Completed = make([]*StepReport, len(fsr.completed))
for i := range fsr.completed {
rep.Completed[i] = fsr.completed[i].Report()
}
rep.Pending = make([]*StepReport, len(fsr.pending))
for i := range fsr.pending {
rep.Pending[i] = fsr.pending[i].Report()
}
return &rep
}
func (s *FSReplicationStep) do(ctx context.Context, ep EndpointPair) FSReplicationStepState {
fs := s.fsrep.fs
log := GetLogger(ctx).
WithField("filesystem", fs).
WithField("step", s.String())
updateStateError := func(err error) FSReplicationStepState {
s.lock.Lock()
defer s.lock.Unlock()
s.err = err
switch err {
case io.EOF:
fallthrough
case io.ErrUnexpectedEOF:
fallthrough
case io.ErrClosedPipe:
s.state = StepRetry
return s.state
}
if _, ok := err.(net.Error); ok {
s.state = StepRetry
return s.state
}
s.state = StepPermanentError
return s.state
}
updateStateCompleted := func() FSReplicationStepState {
s.lock.Lock()
defer s.lock.Unlock()
s.err = nil
s.state = StepCompleted
return s.state
}
var sr *pdu.SendReq
if s.from == nil {
sr = &pdu.SendReq{
Filesystem: fs,
From: s.to.RelName(), // FIXME fix protocol to use To, like zfs does internally
}
} else {
sr = &pdu.SendReq{
Filesystem: fs,
From: s.from.RelName(),
To: s.to.RelName(),
}
}
log.WithField("request", sr).Debug("initiate send request")
sres, sstream, err := ep.Sender().Send(ctx, sr)
if err != nil {
log.WithError(err).Error("send request failed")
return updateStateError(err)
}
if sstream == nil {
err := errors.New("send request did not return a stream, broken endpoint implementation")
return updateStateError(err)
}
rr := &pdu.ReceiveReq{
Filesystem: fs,
ClearResumeToken: !sres.UsedResumeToken,
}
log.WithField("request", rr).Debug("initiate receive request")
err = ep.Receiver().Receive(ctx, rr, sstream)
if err != nil {
log.WithError(err).Error("receive request failed (might also be error on sender)")
sstream.Close()
// This failure could be due to
// - an unexpected exit of ZFS on the sending side
// - an unexpected exit of ZFS on the receiving side
// - a connectivity issue
return updateStateError(err)
}
log.Info("receive finished")
return updateStateCompleted()
}
func (s *FSReplicationStep) String() string {
if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send
return fmt.Sprintf("%s%s (full)", s.fsrep.fs, s.to.RelName())
} else {
return fmt.Sprintf("%s(%s => %s)", s.fsrep.fs, s.from, s.to.RelName())
}
}
func (step *FSReplicationStep) Report() *StepReport {
var from string // FIXME follow same convention as ZFS: to should be nil on full send
if step.from != nil {
from = step.from.RelName()
}
rep := StepReport{
From: from,
To: step.to.RelName(),
Status: step.state.String(),
}
return &rep
}