restructure and rename, making mainfsm the replication package itself

This commit is contained in:
Christian Schwarz 2018-08-22 00:10:09 +02:00
parent 2f205d205b
commit 301c7b2dd5
17 changed files with 348 additions and 303 deletions

View File

@ -4,9 +4,10 @@
ROOT := github.com/zrepl/zrepl
SUBPKGS := cmd
SUBPKGS += cmd/replication
SUBPKGS += cmd/replication/internal/common
SUBPKGS += cmd/replication/internal/mainfsm
SUBPKGS += cmd/replication/internal/fsfsm
SUBPKGS += cmd/replication/fsrep
SUBPKGS += cmd/replication/pdu
SUBPKGS += cmd/replication/internal/queue
SUBPKGS += cmd/replication/internal/diff
SUBPKGS += logger util zfs
_TESTPKGS := $(ROOT) $(foreach p,$(SUBPKGS),$(ROOT)/$(p))

View File

@ -9,7 +9,6 @@ import (
"github.com/zrepl/zrepl/zfs"
"sync"
"github.com/zrepl/zrepl/cmd/replication"
"github.com/zrepl/zrepl/cmd/replication/common"
)
type LocalJob struct {
@ -148,7 +147,7 @@ outer:
j.mainTask.Enter("replicate")
rep := replication.NewReplication()
rep.Drive(ctx, common.NewEndpointPairPull(sender, receiver))
rep.Drive(ctx, sender, receiver)
j.mainTask.Finish()

View File

@ -14,7 +14,6 @@ import (
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/cmd/replication"
"github.com/zrepl/zrepl/cmd/replication/common"
)
type PullJob struct {
@ -30,7 +29,7 @@ type PullJob struct {
Debug JobDebugSettings
task *Task
rep replication.Replication
rep *replication.Replication
}
func parsePullJob(c JobParsingContext, name string, i map[string]interface{}) (j *PullJob, err error) {
@ -189,12 +188,12 @@ func (j *PullJob) doRun(ctx context.Context) {
}
ctx = common.ContextWithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")})
ctx = replication.WithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")})
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField("subsystem", "rpc.protocol")})
ctx = context.WithValue(ctx, contextKeyLog, j.task.Log().WithField("subsystem", "rpc.endpoint"))
j.rep = replication.NewReplication()
j.rep.Drive(ctx, common.NewEndpointPairPull(sender, puller))
j.rep.Drive(ctx, sender, puller)
client.Close()
j.task.Finish()

View File

@ -2,7 +2,6 @@ package cmd
import (
"fmt"
"github.com/zrepl/zrepl/cmd/replication/common"
"github.com/zrepl/zrepl/cmd/replication/pdu"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/zfs"
@ -11,6 +10,7 @@ import (
"github.com/golang/protobuf/proto"
"bytes"
"context"
"github.com/zrepl/zrepl/cmd/replication"
)
type InitialReplPolicy string
@ -57,7 +57,7 @@ func (p *SenderEndpoint) ListFilesystemVersions(ctx context.Context, fs string)
return nil, err
}
if !pass {
return nil, common.NewFilteredError(fs)
return nil, replication.NewFilteredError(fs)
}
fsvs, err := zfs.ZFSListFilesystemVersions(dp, p.FilesystemVersionFilter)
if err != nil {
@ -80,7 +80,7 @@ func (p *SenderEndpoint) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes
return nil, nil, err
}
if !pass {
return nil, nil, common.NewFilteredError(r.Filesystem)
return nil, nil, replication.NewFilteredError(r.Filesystem)
}
stream, err := zfs.ZFSSend(r.Filesystem, r.From, r.To)
if err != nil {
@ -324,7 +324,7 @@ func (s RemoteEndpoint) Receive(ctx context.Context, r *pdu.ReceiveReq, sendStre
}
type HandlerAdaptor struct {
ep common.ReplicationEndpoint
ep replication.Endpoint
}
func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructured *bytes.Buffer, reqStream io.ReadCloser) (resStructured *bytes.Buffer, resStream io.ReadCloser, err error) {
@ -369,11 +369,16 @@ func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructu
case RPCSend:
sender, ok := a.ep.(replication.Sender)
if !ok {
goto Err
}
var req pdu.SendReq
if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil {
return nil, nil, err
}
res, sendStream, err := a.ep.Send(ctx, &req)
res, sendStream, err := sender.Send(ctx, &req)
if err != nil {
return nil, nil, err
}
@ -385,11 +390,16 @@ func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructu
case RPCReceive:
receiver, ok := a.ep.(replication.Receiver)
if !ok {
goto Err
}
var req pdu.ReceiveReq
if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil {
return nil, nil, err
}
err := a.ep.Receive(ctx, &req, reqStream)
err := receiver.Receive(ctx, &req, reqStream)
if err != nil {
return nil, nil, err
}
@ -399,8 +409,7 @@ func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructu
}
return bytes.NewBuffer(b), nil, err
default:
return nil, nil, errors.New("no handler for given endpoint")
}
Err:
return nil, nil, errors.New("no handler for given endpoint")
}

View File

@ -1,45 +0,0 @@
package common
import (
"context"
"io"
"github.com/zrepl/zrepl/cmd/replication/pdu"
"github.com/zrepl/zrepl/logger"
)
type contextKey int
const (
contextKeyLog contextKey = iota
)
type Logger = logger.Logger
func ContextWithLogger(ctx context.Context, l Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, l)
}
func GetLogger(ctx context.Context) Logger {
l, ok := ctx.Value(contextKeyLog).(Logger)
if !ok {
l = logger.NewNullLogger()
}
return l
}
type ReplicationEndpoint interface {
// Does not include placeholder filesystems
ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error)
ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) // fix depS
Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error)
Receive(ctx context.Context, r *pdu.ReceiveReq, sendStream io.ReadCloser) error
}
type FilteredError struct{ fs string }
func NewFilteredError(fs string) *FilteredError {
return &FilteredError{fs}
}
func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs }

View File

@ -0,0 +1,30 @@
package replication
import (
"github.com/zrepl/zrepl/logger"
"context"
"github.com/zrepl/zrepl/cmd/replication/fsrep"
)
type contextKey int
const (
contextKeyLog contextKey = iota
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, l Logger) context.Context {
ctx = context.WithValue(ctx, contextKeyLog, l)
ctx = fsrep.WithLogger(ctx, l)
return ctx
}
func getLogger(ctx context.Context) Logger {
l, ok := ctx.Value(contextKeyLog).(Logger)
if !ok {
l = logger.NewNullLogger()
}
return l
}

View File

@ -1,4 +1,6 @@
package fsfsm
// Package fsrep implements replication of a single file system with existing versions
// from a sender to a receiver.
package fsrep
import (
"context"
@ -11,16 +13,44 @@ import (
"time"
"github.com/zrepl/zrepl/cmd/replication/pdu"
. "github.com/zrepl/zrepl/cmd/replication/common"
"github.com/zrepl/zrepl/logger"
)
type contextKey int
const (
contextKeyLogger contextKey = iota
)
type Logger = logger.Logger
func WithLogger(ctx context.Context, log Logger) context.Context {
return context.WithValue(ctx, contextKeyLogger, log)
}
func getLogger(ctx context.Context) Logger {
l, ok := ctx.Value(contextKeyLogger).(Logger)
if !ok {
l = logger.NewNullLogger()
}
return l
}
type Sender interface {
Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error)
}
type Receiver interface {
Receive(ctx context.Context, r *pdu.ReceiveReq, sendStream io.ReadCloser) error
}
type StepReport struct {
From, To string
Status string
Problem string
}
type FilesystemReplicationReport struct {
type Report struct {
Filesystem string
Status string
Problem string
@ -28,90 +58,90 @@ type FilesystemReplicationReport struct {
}
//go:generate stringer -type=FSReplicationState
type FSReplicationState uint
//go:generate stringer -type=State
type State uint
const (
FSReady FSReplicationState = 1 << iota
FSRetryWait
FSPermanentError
FSCompleted
Ready State = 1 << iota
RetryWait
PermanentError
Completed
)
func (s FSReplicationState) fsrsf() fsrsf {
func (s State) fsrsf() state {
idx := bits.TrailingZeros(uint(s))
if idx == bits.UintSize {
panic(s)
}
m := []fsrsf{
fsrsfReady,
fsrsfRetryWait,
m := []state{
stateReady,
stateRetryWait,
nil,
nil,
}
return m[idx]
}
type FSReplication struct {
type Replication struct {
// lock protects all fields in this struct, but not the data behind pointers
lock sync.Mutex
state FSReplicationState
state State
fs string
err error
retryWaitUntil time.Time
completed, pending []*FSReplicationStep
completed, pending []*ReplicationStep
}
func (f *FSReplication) State() FSReplicationState {
func (f *Replication) State() State {
f.lock.Lock()
defer f.lock.Unlock()
return f.state
}
type FSReplicationBuilder struct {
r *FSReplication
type ReplicationBuilder struct {
r *Replication
}
func BuildFSReplication(fs string) *FSReplicationBuilder {
return &FSReplicationBuilder{&FSReplication{fs: fs}}
func BuildReplication(fs string) *ReplicationBuilder {
return &ReplicationBuilder{&Replication{fs: fs}}
}
func (b *FSReplicationBuilder) AddStep(from, to FilesystemVersion) *FSReplicationBuilder {
step := &FSReplicationStep{
state: StepReady,
fsrep: b.r,
from: from,
to: to,
func (b *ReplicationBuilder) AddStep(from, to FilesystemVersion) *ReplicationBuilder {
step := &ReplicationStep{
state: StepReady,
parent: b.r,
from: from,
to: to,
}
b.r.pending = append(b.r.pending, step)
return b
}
func (b *FSReplicationBuilder) Done() (r *FSReplication) {
func (b *ReplicationBuilder) Done() (r *Replication) {
if len(b.r.pending) > 0 {
b.r.state = FSReady
b.r.state = Ready
} else {
b.r.state = FSCompleted
b.r.state = Completed
}
r = b.r
b.r = nil
return r
}
func NewFSReplicationWithPermanentError(fs string, err error) *FSReplication {
return &FSReplication{
state: FSPermanentError,
func NewReplicationWithPermanentError(fs string, err error) *Replication {
return &Replication{
state: PermanentError,
fs: fs,
err: err,
}
}
//go:generate stringer -type=FSReplicationStepState
type FSReplicationStepState uint
//go:generate stringer -type=StepState
type StepState uint
const (
StepReady FSReplicationStepState = 1 << iota
StepReady StepState = 1 << iota
StepRetry
StepPermanentError
StepCompleted
@ -122,22 +152,22 @@ type FilesystemVersion interface {
RelName() string
}
type FSReplicationStep struct {
type ReplicationStep struct {
// only protects state, err
// from, to and fsrep are assumed to be immutable
// from, to and parent are assumed to be immutable
lock sync.Mutex
state FSReplicationStepState
state StepState
from, to FilesystemVersion
fsrep *FSReplication
parent *Replication
// both retry and permanent error
err error
}
func (f *FSReplication) TakeStep(ctx context.Context, sender, receiver ReplicationEndpoint) (post FSReplicationState, nextStepDate time.Time) {
func (f *Replication) TakeStep(ctx context.Context, sender Sender, receiver Receiver) (post State, nextStepDate time.Time) {
var u fsrUpdater = func(fu func(*FSReplication)) FSReplicationState {
var u updater = func(fu func(*Replication)) State {
f.lock.Lock()
defer f.lock.Unlock()
if fu != nil {
@ -145,20 +175,20 @@ func (f *FSReplication) TakeStep(ctx context.Context, sender, receiver Replicati
}
return f.state
}
var s fsrsf = u(nil).fsrsf()
var s state = u(nil).fsrsf()
pre := u(nil)
preTime := time.Now()
s = s(ctx, sender, receiver, u)
delta := time.Now().Sub(preTime)
post = u(func(f *FSReplication) {
post = u(func(f *Replication) {
if len(f.pending) == 0 {
return
}
nextStepDate = f.pending[0].to.SnapshotTime()
})
GetLogger(ctx).
getLogger(ctx).
WithField("fs", f.fs).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta).
@ -167,41 +197,41 @@ func (f *FSReplication) TakeStep(ctx context.Context, sender, receiver Replicati
return post, nextStepDate
}
type fsrUpdater func(func(fsr *FSReplication)) FSReplicationState
type updater func(func(fsr *Replication)) State
type fsrsf func(ctx context.Context, sender, receiver ReplicationEndpoint, u fsrUpdater) fsrsf
type state func(ctx context.Context, sender Sender, receiver Receiver, u updater) state
func fsrsfReady(ctx context.Context, sender, receiver ReplicationEndpoint, u fsrUpdater) fsrsf {
func stateReady(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
var current *FSReplicationStep
s := u(func(f *FSReplication) {
var current *ReplicationStep
s := u(func(f *Replication) {
if len(f.pending) == 0 {
f.state = FSCompleted
f.state = Completed
return
}
current = f.pending[0]
})
if s != FSReady {
if s != Ready {
return s.fsrsf()
}
stepState := current.do(ctx, sender, receiver)
return u(func(f *FSReplication) {
return u(func(f *Replication) {
switch stepState {
case StepCompleted:
f.completed = append(f.completed, current)
f.pending = f.pending[1:]
if len(f.pending) > 0 {
f.state = FSReady
f.state = Ready
} else {
f.state = FSCompleted
f.state = Completed
}
case StepRetry:
f.retryWaitUntil = time.Now().Add(10 * time.Second) // FIXME make configurable
f.state = FSRetryWait
f.state = RetryWait
case StepPermanentError:
f.state = FSPermanentError
f.state = PermanentError
f.err = errors.New("a replication step failed with a permanent error")
default:
panic(f)
@ -209,37 +239,36 @@ func fsrsfReady(ctx context.Context, sender, receiver ReplicationEndpoint, u fsr
}).fsrsf()
}
func fsrsfRetryWait(ctx context.Context, sender, receiver ReplicationEndpoint, u fsrUpdater) fsrsf {
func stateRetryWait(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
var sleepUntil time.Time
u(func(f *FSReplication) {
u(func(f *Replication) {
sleepUntil = f.retryWaitUntil
})
t := time.NewTimer(sleepUntil.Sub(time.Now()))
defer t.Stop()
select {
case <-ctx.Done():
return u(func(f *FSReplication) {
f.state = FSPermanentError
return u(func(f *Replication) {
f.state = PermanentError
f.err = ctx.Err()
}).fsrsf()
case <-t.C:
}
return u(func(f *FSReplication) {
f.state = FSReady
return u(func(f *Replication) {
f.state = Ready
}).fsrsf()
}
// access to fsr's members must be exclusive
func (fsr *FSReplication) Report() *FilesystemReplicationReport {
func (fsr *Replication) Report() *Report {
fsr.lock.Lock()
defer fsr.lock.Unlock()
rep := FilesystemReplicationReport{
rep := Report{
Filesystem: fsr.fs,
Status: fsr.state.String(),
}
if fsr.state&FSPermanentError != 0 {
if fsr.state&PermanentError != 0 {
rep.Problem = fsr.err.Error()
return &rep
}
@ -255,15 +284,15 @@ func (fsr *FSReplication) Report() *FilesystemReplicationReport {
return &rep
}
func (s *FSReplicationStep) do(ctx context.Context, sender, receiver ReplicationEndpoint) FSReplicationStepState {
func (s *ReplicationStep) do(ctx context.Context, sender Sender, receiver Receiver) StepState {
fs := s.fsrep.fs
fs := s.parent.fs
log := GetLogger(ctx).
log := getLogger(ctx).
WithField("filesystem", fs).
WithField("step", s.String())
updateStateError := func(err error) FSReplicationStepState {
updateStateError := func(err error) StepState {
s.lock.Lock()
defer s.lock.Unlock()
@ -285,7 +314,7 @@ func (s *FSReplicationStep) do(ctx context.Context, sender, receiver Replication
return s.state
}
updateStateCompleted := func() FSReplicationStepState {
updateStateCompleted := func() StepState {
s.lock.Lock()
defer s.lock.Unlock()
s.err = nil
@ -338,15 +367,15 @@ func (s *FSReplicationStep) do(ctx context.Context, sender, receiver Replication
}
func (s *FSReplicationStep) String() string {
func (s *ReplicationStep) String() string {
if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send
return fmt.Sprintf("%s%s (full)", s.fsrep.fs, s.to.RelName())
return fmt.Sprintf("%s%s (full)", s.parent.fs, s.to.RelName())
} else {
return fmt.Sprintf("%s(%s => %s)", s.fsrep.fs, s.from, s.to.RelName())
return fmt.Sprintf("%s(%s => %s)", s.parent.fs, s.from, s.to.RelName())
}
}
func (step *FSReplicationStep) Report() *StepReport {
func (step *ReplicationStep) Report() *StepReport {
var from string // FIXME follow same convention as ZFS: to should be nil on full send
if step.from != nil {
from = step.from.RelName()

View File

@ -0,0 +1,29 @@
// Code generated by "stringer -type=State"; DO NOT EDIT.
package fsrep
import "strconv"
const (
_State_name_0 = "ReadyRetryWait"
_State_name_1 = "PermanentError"
_State_name_2 = "Completed"
)
var (
_State_index_0 = [...]uint8{0, 5, 14}
)
func (i State) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _State_name_0[_State_index_0[i]:_State_index_0[i+1]]
case i == 4:
return _State_name_1
case i == 8:
return _State_name_2
default:
return "State(" + strconv.FormatInt(int64(i), 10) + ")"
}
}

View File

@ -0,0 +1,29 @@
// Code generated by "stringer -type=StepState"; DO NOT EDIT.
package fsrep
import "strconv"
const (
_StepState_name_0 = "StepReadyStepRetry"
_StepState_name_1 = "StepPermanentError"
_StepState_name_2 = "StepCompleted"
)
var (
_StepState_index_0 = [...]uint8{0, 9, 18}
)
func (i StepState) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _StepState_name_0[_StepState_index_0[i]:_StepState_index_0[i+1]]
case i == 4:
return _StepState_name_1
case i == 8:
return _StepState_name_2
default:
return "StepState(" + strconv.FormatInt(int64(i), 10) + ")"
}
}

View File

@ -1,29 +0,0 @@
// Code generated by "stringer -type=FSReplicationState"; DO NOT EDIT.
package fsfsm
import "strconv"
const (
_FSReplicationState_name_0 = "FSReadyFSRetryWait"
_FSReplicationState_name_1 = "FSPermanentError"
_FSReplicationState_name_2 = "FSCompleted"
)
var (
_FSReplicationState_index_0 = [...]uint8{0, 7, 18}
)
func (i FSReplicationState) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _FSReplicationState_name_0[_FSReplicationState_index_0[i]:_FSReplicationState_index_0[i+1]]
case i == 4:
return _FSReplicationState_name_1
case i == 8:
return _FSReplicationState_name_2
default:
return "FSReplicationState(" + strconv.FormatInt(int64(i), 10) + ")"
}
}

View File

@ -1,29 +0,0 @@
// Code generated by "stringer -type=FSReplicationStepState"; DO NOT EDIT.
package fsfsm
import "strconv"
const (
_FSReplicationStepState_name_0 = "StepReadyStepRetry"
_FSReplicationStepState_name_1 = "StepPermanentError"
_FSReplicationStepState_name_2 = "StepCompleted"
)
var (
_FSReplicationStepState_index_0 = [...]uint8{0, 9, 18}
)
func (i FSReplicationStepState) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _FSReplicationStepState_name_0[_FSReplicationStepState_index_0[i]:_FSReplicationStepState_index_0[i+1]]
case i == 4:
return _FSReplicationStepState_name_1
case i == 8:
return _FSReplicationStepState_name_2
default:
return "FSReplicationStepState(" + strconv.FormatInt(int64(i), 10) + ")"
}
}

View File

@ -1,35 +0,0 @@
// Code generated by "stringer -type=ReplicationState"; DO NOT EDIT.
package mainfsm
import "strconv"
const (
_ReplicationState_name_0 = "PlanningPlanningError"
_ReplicationState_name_1 = "Working"
_ReplicationState_name_2 = "WorkingWait"
_ReplicationState_name_3 = "Completed"
_ReplicationState_name_4 = "ContextDone"
)
var (
_ReplicationState_index_0 = [...]uint8{0, 8, 21}
)
func (i ReplicationState) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _ReplicationState_name_0[_ReplicationState_index_0[i]:_ReplicationState_index_0[i+1]]
case i == 4:
return _ReplicationState_name_1
case i == 8:
return _ReplicationState_name_2
case i == 16:
return _ReplicationState_name_3
case i == 32:
return _ReplicationState_name_4
default:
return "ReplicationState(" + strconv.FormatInt(int64(i), 10) + ")"
}
}

View File

@ -4,17 +4,17 @@ import (
"time"
"sort"
. "github.com/zrepl/zrepl/cmd/replication/internal/fsfsm"
. "github.com/zrepl/zrepl/cmd/replication/fsrep"
)
type replicationQueueItem struct {
retriesSinceLastError int
// duplicates fsr.state to avoid accessing and locking fsr
state FSReplicationState
state State
// duplicates fsr.current.nextStepDate to avoid accessing & locking fsr
nextStepDate time.Time
fsr *FSReplication
fsr *Replication
}
type ReplicationQueue []*replicationQueueItem
@ -32,14 +32,14 @@ type lessmapEntry struct{
less func(a,b *replicationQueueItem) bool
}
var lessmap = map[FSReplicationState]lessmapEntry {
FSReady: {
var lessmap = map[State]lessmapEntry {
Ready: {
prio: 0,
less: func(a, b *replicationQueueItem) bool {
return a.nextStepDate.Before(b.nextStepDate)
},
},
FSRetryWait: {
RetryWait: {
prio: 1,
less: func(a, b *replicationQueueItem) bool {
return a.retriesSinceLastError < b.retriesSinceLastError
@ -66,10 +66,10 @@ func (q ReplicationQueue) Less(i, j int) bool {
return al.less(a, b)
}
func (q *ReplicationQueue) sort() (done []*FSReplication) {
func (q *ReplicationQueue) sort() (done []*Replication) {
// pre-scan for everything that is not ready
newq := make(ReplicationQueue, 0, len(*q))
done = make([]*FSReplication, 0, len(*q))
done = make([]*Replication, 0, len(*q))
for _, qitem := range *q {
if _, ok := lessmap[qitem.state]; !ok {
done = append(done, qitem.fsr)
@ -83,7 +83,7 @@ func (q *ReplicationQueue) sort() (done []*FSReplication) {
}
// next remains valid until the next call to GetNext()
func (q *ReplicationQueue) GetNext() (done []*FSReplication, next *ReplicationQueueItemHandle) {
func (q *ReplicationQueue) GetNext() (done []*Replication, next *ReplicationQueueItemHandle) {
done = q.sort()
if len(*q) == 0 {
return done, nil
@ -92,7 +92,7 @@ func (q *ReplicationQueue) GetNext() (done []*FSReplication, next *ReplicationQu
return done, next
}
func (q *ReplicationQueue) Add(fsr *FSReplication) {
func (q *ReplicationQueue) Add(fsr *Replication) {
*q = append(*q, &replicationQueueItem{
fsr: fsr,
state: fsr.State(),
@ -109,16 +109,16 @@ type ReplicationQueueItemHandle struct {
i *replicationQueueItem
}
func (h ReplicationQueueItemHandle) GetFSReplication() *FSReplication {
func (h ReplicationQueueItemHandle) GetFSReplication() *Replication {
return h.i.fsr
}
func (h ReplicationQueueItemHandle) Update(newState FSReplicationState, nextStepDate time.Time) {
func (h ReplicationQueueItemHandle) Update(newState State, nextStepDate time.Time) {
h.i.state = newState
h.i.nextStepDate = nextStepDate
if h.i.state&FSReady != 0 {
if h.i.state&Ready != 0 {
h.i.retriesSinceLastError = 0
} else if h.i.state&FSRetryWait != 0 {
} else if h.i.state&RetryWait != 0 {
h.i.retriesSinceLastError++
}
}

View File

@ -1,4 +1,6 @@
package mainfsm
// Package replication implements replication of filesystems with existing
// versions (snapshots) from a sender to a receiver.
package replication
import (
"context"
@ -8,17 +10,17 @@ import (
"sync"
"time"
. "github.com/zrepl/zrepl/cmd/replication/common"
"github.com/zrepl/zrepl/cmd/replication/pdu"
"github.com/zrepl/zrepl/cmd/replication/internal/fsfsm"
. "github.com/zrepl/zrepl/cmd/replication/internal/mainfsm/queue"
"github.com/zrepl/zrepl/cmd/replication/fsrep"
. "github.com/zrepl/zrepl/cmd/replication/internal/queue"
. "github.com/zrepl/zrepl/cmd/replication/internal/diff"
)
//go:generate stringer -type=ReplicationState
type ReplicationState uint
//go:generate stringer -type=State
type State uint
const (
Planning ReplicationState = 1 << iota
Planning State = 1 << iota
PlanningError
Working
WorkingWait
@ -26,31 +28,35 @@ const (
ContextDone
)
func (s ReplicationState) rsf() replicationStateFunc {
func (s State) rsf() state {
idx := bits.TrailingZeros(uint(s))
if idx == bits.UintSize {
panic(s) // invalid value
}
m := []replicationStateFunc{
rsfPlanning,
rsfPlanningError,
rsfWorking,
rsfWorkingWait,
m := []state{
statePlanning,
statePlanningError,
stateWorking,
stateWorkingWait,
nil,
nil,
}
return m[idx]
}
// Replication implements the replication of multiple file systems from a Sender to a Receiver.
//
// It is a state machine that is driven by the Drive method
// and provides asynchronous reporting via the Report method (i.e. from another goroutine).
type Replication struct {
// lock protects all fields of this struct (but not the fields behind pointers!)
lock sync.Mutex
state ReplicationState
state State
// Working, WorkingWait, Completed, ContextDone
queue *ReplicationQueue
completed []*fsfsm.FSReplication
completed []*fsrep.Replication
active *ReplicationQueueItemHandle
// PlanningError
@ -66,9 +72,9 @@ type Replication struct {
type Report struct {
Status string
Problem string
Completed []*fsfsm.FilesystemReplicationReport
Pending []*fsfsm.FilesystemReplicationReport
Active *fsfsm.FilesystemReplicationReport
Completed []*fsrep.Report
Pending []*fsrep.Report
Active *fsrep.Report
}
@ -79,12 +85,45 @@ func NewReplication() *Replication {
return &r
}
type replicationUpdater func(func(*Replication)) (newState ReplicationState)
type replicationStateFunc func(ctx context.Context, sender, receiver ReplicationEndpoint, u replicationUpdater) replicationStateFunc
type Endpoint interface {
// Does not include placeholder filesystems
ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error)
ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) // fix depS
}
func (r *Replication) Drive(ctx context.Context, sender, receiver ReplicationEndpoint) {
type Sender interface {
Endpoint
fsrep.Sender
}
var u replicationUpdater = func(f func(*Replication)) ReplicationState {
type Receiver interface {
Endpoint
fsrep.Receiver
}
type FilteredError struct{ fs string }
func NewFilteredError(fs string) *FilteredError {
return &FilteredError{fs}
}
func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs }
type updater func(func(*Replication)) (newState State)
type state func(ctx context.Context, sender Sender, receiver Receiver, u updater) state
// Drive starts the state machine and returns only after replication has finished (with or without errors).
// The Logger in ctx is used for both debug and error logging, but is not guaranteed to be stable
// or end-user friendly.
// User-facing replication progress reports and can be obtained using the Report method,
// whose output will not change after Drive returns.
//
// FIXME: Drive may be only called once per instance of Replication
func (r *Replication) Drive(ctx context.Context, sender Sender, receiver Receiver) {
var u updater = func(f func(*Replication)) State {
r.lock.Lock()
defer r.lock.Unlock()
if f != nil {
@ -93,21 +132,21 @@ func (r *Replication) Drive(ctx context.Context, sender, receiver ReplicationEnd
return r.state
}
var s replicationStateFunc = rsfPlanning
var pre, post ReplicationState
var s state = statePlanning
var pre, post State
for s != nil {
preTime := time.Now()
pre = u(nil)
s = s(ctx, sender, receiver, u)
delta := time.Now().Sub(preTime)
post = u(nil)
GetLogger(ctx).
getLogger(ctx).
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta).
Debug("main state transition")
}
GetLogger(ctx).
getLogger(ctx).
WithField("final_state", post).
Debug("main final state")
}
@ -133,11 +172,11 @@ func resolveConflict(conflict error) (path []*pdu.FilesystemVersion, msg string)
return nil, "no automated way to handle conflict type"
}
func rsfPlanning(ctx context.Context, sender, receiver ReplicationEndpoint, u replicationUpdater) replicationStateFunc {
func statePlanning(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
log := GetLogger(ctx)
log := getLogger(ctx)
handlePlanningError := func(err error) replicationStateFunc {
handlePlanningError := func(err error) state {
return u(func(r *Replication) {
r.planningError = err
r.state = PlanningError
@ -173,7 +212,7 @@ func rsfPlanning(ctx context.Context, sender, receiver ReplicationEndpoint, u re
if len(sfsvs) <= 1 {
err := errors.New("sender does not have any versions")
log.Error(err.Error())
q.Add(fsfsm.NewFSReplicationWithPermanentError(fs.Path, err))
q.Add(fsrep.NewReplicationWithPermanentError(fs.Path, err))
continue
}
@ -212,11 +251,11 @@ func rsfPlanning(ctx context.Context, sender, receiver ReplicationEndpoint, u re
}
}
if path == nil {
q.Add(fsfsm.NewFSReplicationWithPermanentError(fs.Path, conflict))
q.Add(fsrep.NewReplicationWithPermanentError(fs.Path, conflict))
continue
}
fsrfsm := fsfsm.BuildFSReplication(fs.Path)
fsrfsm := fsrep.BuildReplication(fs.Path)
if len(path) == 1 {
fsrfsm.AddStep(nil, path[0])
} else {
@ -236,7 +275,7 @@ func rsfPlanning(ctx context.Context, sender, receiver ReplicationEndpoint, u re
}).rsf()
}
func rsfPlanningError(ctx context.Context, sender, receiver ReplicationEndpoint, u replicationUpdater) replicationStateFunc {
func statePlanningError(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
sleepTime := 10 * time.Second
u(func(r *Replication) {
r.sleepUntil = time.Now().Add(sleepTime)
@ -256,7 +295,7 @@ func rsfPlanningError(ctx context.Context, sender, receiver ReplicationEndpoint,
}
}
func rsfWorking(ctx context.Context, sender, receiver ReplicationEndpoint, u replicationUpdater) replicationStateFunc {
func stateWorking(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
var active *ReplicationQueueItemHandle
rsfNext := u(func(r *Replication) {
@ -281,7 +320,7 @@ func rsfWorking(ctx context.Context, sender, receiver ReplicationEndpoint, u rep
}).rsf()
}
func rsfWorkingWait(ctx context.Context, sender, receiver ReplicationEndpoint, u replicationUpdater) replicationStateFunc {
func stateWorkingWait(ctx context.Context, sender Sender, receiver Receiver, u updater) state {
sleepTime := 10 * time.Second
u(func(r *Replication) {
r.sleepUntil = time.Now().Add(sleepTime)
@ -301,6 +340,9 @@ func rsfWorkingWait(ctx context.Context, sender, receiver ReplicationEndpoint, u
}
}
// Report provides a summary of the progress of the Replication,
// i.e., a condensed dump of the internal state machine.
// Report is safe to be called asynchronously while Drive is running.
func (r *Replication) Report() *Report {
r.lock.Lock()
defer r.lock.Unlock()
@ -319,10 +361,10 @@ func (r *Replication) Report() *Report {
return &rep
}
rep.Pending = make([]*fsfsm.FilesystemReplicationReport, 0, r.queue.Len())
rep.Completed = make([]*fsfsm.FilesystemReplicationReport, 0, len(r.completed)) // room for active (potentially)
rep.Pending = make([]*fsrep.Report, 0, r.queue.Len())
rep.Completed = make([]*fsrep.Report, 0, len(r.completed)) // room for active (potentially)
var active *fsfsm.FSReplication
var active *fsrep.Replication
if r.active != nil {
active = r.active.GetFSReplication()
rep.Active = active.Report()

View File

@ -1,19 +0,0 @@
package replication
import (
"context"
"github.com/zrepl/zrepl/cmd/replication/common"
"github.com/zrepl/zrepl/cmd/replication/internal/mainfsm"
)
type Report = mainfsm.Report
type Replication interface {
Drive(ctx context.Context, sender, receiver common.ReplicationEndpoint)
Report() *Report
}
func NewReplication() Replication {
return mainfsm.NewReplication()
}

View File

@ -0,0 +1,35 @@
// Code generated by "stringer -type=State"; DO NOT EDIT.
package replication
import "strconv"
const (
_State_name_0 = "PlanningPlanningError"
_State_name_1 = "Working"
_State_name_2 = "WorkingWait"
_State_name_3 = "Completed"
_State_name_4 = "ContextDone"
)
var (
_State_index_0 = [...]uint8{0, 8, 21}
)
func (i State) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _State_name_0[_State_index_0[i]:_State_index_0[i+1]]
case i == 4:
return _State_name_1
case i == 8:
return _State_name_2
case i == 16:
return _State_name_3
case i == 32:
return _State_name_4
default:
return "State(" + strconv.FormatInt(int64(i), 10) + ")"
}
}