mirror of
synced 2025-03-10 13:08:10 +01:00
WIP state-machine based replication
This commit is contained in:
@ -8,7 +8,7 @@ import (
type LocalJob struct {
@ -146,7 +146,7 @@ outer:
j.mainTask.Log().Debug("replicating from lhs to rhs")
replication.Replicate(ctx, replication.NewEndpointPairPull(sender, receiver))
replication.Replicate(ctx, replication.NewEndpointPairPull(sender, receiver), nil) // FIXME
@ -1,6 +1,7 @@
package cmd
import (
@ -12,7 +13,7 @@ import (
type PullJob struct {
@ -165,7 +166,10 @@ func (j *PullJob) doRun(ctx context.Context) {
client, err := streamrpc.NewClient(j.Connect, clientConf)
//client, err := streamrpc.NewClient(j.Connect, clientConf)
client, err := streamrpc.NewClient(&tcpConnecter{net.Dialer{
Timeout: 10*time.Second,
}}, clientConf)
defer client.Close()
@ -182,10 +186,26 @@ func (j *PullJob) doRun(ctx context.Context) {
usr2 := make(chan os.Signal)
defer close(usr2)
signal.Notify(usr2, syscall.SIGUSR2)
defer signal.Stop(usr2)
retryNow := make(chan struct{}, 1) // buffered so we don't leak the goroutine
go func() {
for {
sig := <-usr2
if sig != nil {
retryNow <- struct{}{}
} else {
ctx = replication.ContextWithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")})
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField("subsystem", "rpc.protocol")})
ctx = context.WithValue(ctx, contextKeyLog, j.task.Log().WithField("subsystem", "rpc.endpoint"))
replication.Replicate(ctx, replication.NewEndpointPairPull(sender, puller))
replication.Replicate(ctx, replication.NewEndpointPairPull(sender, puller), retryNow)
@ -146,7 +146,9 @@ func (j *SourceJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pru
func (j *SourceJob) serve(ctx context.Context, task *Task) {
listener, err := j.Serve.Listen()
//listener, err := j.Serve.Listen()
listener, err := net.Listen("tcp", ":8888")
if err != nil {
task.Log().WithError(err).Error("error listening")
@ -2,7 +2,7 @@ package cmd
import (
Normal file
Normal file
@ -0,0 +1,32 @@
// Code generated by "stringer -type=FSReplicationState"; DO NOT EDIT.
package replication
import "strconv"
const (
_FSReplicationState_name_0 = "FSQueuedFSActive"
_FSReplicationState_name_1 = "FSRetry"
_FSReplicationState_name_2 = "FSPermanentError"
_FSReplicationState_name_3 = "FSCompleted"
var (
_FSReplicationState_index_0 = [...]uint8{0, 8, 16}
func (i FSReplicationState) String() string {
switch {
case 1 <= i && i <= 2:
i -= 1
return _FSReplicationState_name_0[_FSReplicationState_index_0[i]:_FSReplicationState_index_0[i+1]]
case i == 4:
return _FSReplicationState_name_1
case i == 8:
return _FSReplicationState_name_2
case i == 16:
return _FSReplicationState_name_3
return "FSReplicationState(" + strconv.FormatInt(int64(i), 10) + ")"
Normal file
Normal file
@ -0,0 +1,16 @@
// Code generated by "stringer -type=FSReplicationStepState"; DO NOT EDIT.
package replication
import "strconv"
const _FSReplicationStepState_name = "StepPendingStepActiveStepRetryStepPermanentErrorStepCompleted"
var _FSReplicationStepState_index = [...]uint8{0, 11, 21, 30, 48, 61}
func (i FSReplicationStepState) String() string {
if i < 0 || i >= FSReplicationStepState(len(_FSReplicationStepState_index)-1) {
return "FSReplicationStepState(" + strconv.FormatInt(int64(i), 10) + ")"
return _FSReplicationStepState_name[_FSReplicationStepState_index[i]:_FSReplicationStepState_index[i+1]]
Normal file
Normal file
@ -0,0 +1,474 @@
package replication
import (
//go:generate stringer -type=ReplicationState
type ReplicationState int
const (
Planning ReplicationState = iota
type Replication struct {
state ReplicationState
// Working / WorkingWait
pending, completed []*FSReplication
// PlanningError
planningError error
// ContextDone
contextError error
type FSReplicationState int
//go:generate stringer -type=FSReplicationState
const (
FSQueued FSReplicationState = 1 << iota
type FSReplication struct {
state FSReplicationState
fs *Filesystem
permanentError error
retryAt time.Time
completed, pending []*FSReplicationStep
func newFSReplicationPermanentError(fs *Filesystem, err error) *FSReplication {
return &FSReplication{
state: FSPermanentError,
fs: fs,
permanentError: err,
type FSReplicationBuilder struct {
r *FSReplication
steps []*FSReplicationStep
func buildNewFSReplication(fs *Filesystem) *FSReplicationBuilder {
return &FSReplicationBuilder{
r: &FSReplication{
fs: fs,
pending: make([]*FSReplicationStep, 0),
func (b *FSReplicationBuilder) AddStep(from, to *FilesystemVersion) *FSReplication {
step := &FSReplicationStep{
state: StepPending,
fsrep: b.r,
from: from,
to: to,
b.r.pending = append(b.r.pending, step)
return b.r
func (b *FSReplicationBuilder) Complete() *FSReplication {
if len(b.r.pending) > 0 {
b.r.state = FSQueued
} else {
b.r.state = FSCompleted
r := b.r
return r
//go:generate stringer -type=FSReplicationStepState
type FSReplicationStepState int
const (
StepPending FSReplicationStepState = iota
type FSReplicationStep struct {
state FSReplicationStepState
from, to *FilesystemVersion
fsrep *FSReplication
// both retry and permanent error
err error
func (r *Replication) Drive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) {
for !(r.state == Completed || r.state == ContextDone) {
pre := r.state
preTime := time.Now()
r.doDrive(ctx, ep, retryNow)
delta := time.Now().Sub(preTime)
post := r.state
WithField("transition", fmt.Sprintf("%s => %s", pre, post)).
WithField("duration", delta).
Debug("state transition")
func (r *Replication) doDrive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) {
switch r.state {
case Planning:
r.tryBuildPlan(ctx, ep)
case PlanningError:
w := time.NewTimer(10 * time.Second) // FIXME constant make configurable
defer w.Stop()
select {
case <-ctx.Done():
r.state = ContextDone
r.contextError = ctx.Err()
case <-retryNow:
r.state = Planning
r.planningError = nil
case <-w.C:
r.state = Planning
r.planningError = nil
case Working:
if len(r.pending) == 0 {
r.state = Completed
sort.Slice(r.pending, func(i, j int) bool {
a, b := r.pending[i], r.pending[j]
statePrio := func(x *FSReplication) int {
if !(x.state == FSQueued || x.state == FSRetry) {
if x.state == FSQueued {
return 0
} else {
return 1
aprio, bprio := statePrio(a), statePrio(b)
if aprio != bprio {
return aprio < bprio
// now we know they are the same state
if a.state == FSQueued {
return a.nextStepDate().Before(b.nextStepDate())
if a.state == FSRetry {
return a.retryAt.Before(b.retryAt)
panic("should not be reached")
fsrep := r.pending[0]
if fsrep.state == FSRetry {
r.state = WorkingWait
if fsrep.state != FSQueued {
fsState := fsrep.takeStep(ctx, ep)
if fsState&(FSPermanentError|FSCompleted) != 0 {
r.pending = r.pending[1:]
r.completed = append(r.completed, fsrep)
case WorkingWait:
fsrep := r.pending[0]
w := time.NewTimer(fsrep.retryAt.Sub(time.Now()))
defer w.Stop()
select {
case <-ctx.Done():
r.state = ContextDone
r.contextError = ctx.Err()
case <-retryNow:
for _, fsr := range r.pending {
r.state = Working
case <-w.C:
fsrep.retryNow() // avoid timer jitter
r.state = Working
func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) ReplicationState {
log := getLogger(ctx)
planningError := func(err error) ReplicationState {
r.state = PlanningError
r.planningError = err
return r.state
done := func() ReplicationState {
r.state = Working
r.planningError = nil
return r.state
sfss, err := ep.Sender().ListFilesystems(ctx)
if err != nil {
log.WithError(err).Error("error listing sender filesystems")
return planningError(err)
rfss, err := ep.Receiver().ListFilesystems(ctx)
if err != nil {
log.WithError(err).Error("error listing receiver filesystems")
return planningError(err)
r.pending = make([]*FSReplication, 0, len(sfss))
r.completed = make([]*FSReplication, 0, len(sfss))
mainlog := log
for _, fs := range sfss {
log := mainlog.WithField("filesystem", fs.Path)
log.Info("assessing filesystem")
sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path)
if err != nil {
log.WithError(err).Error("cannot get remote filesystem versions")
return planningError(err)
if len(sfsvs) <= 1 {
err := errors.New("sender does not have any versions")
r.completed = append(r.completed, newFSReplicationPermanentError(fs, err))
receiverFSExists := false
for _, rfs := range rfss {
if rfs.Path == fs.Path {
receiverFSExists = true
var rfsvs []*FilesystemVersion
if receiverFSExists {
rfsvs, err = ep.Receiver().ListFilesystemVersions(ctx, fs.Path)
if err != nil {
if _, ok := err.(FilteredError); ok {
log.Info("receiver ignores filesystem")
log.WithError(err).Error("receiver error")
return planningError(err)
} else {
rfsvs = []*FilesystemVersion{}
path, conflict := IncrementalPath(rfsvs, sfsvs)
if conflict != nil {
var msg string
path, msg = resolveConflict(conflict) // no shadowing allowed!
if path != nil {
log.WithField("conflict", conflict).Info("conflict")
log.WithField("resolution", msg).Info("automatically resolved")
} else {
log.WithField("conflict", conflict).Error("conflict")
log.WithField("problem", msg).Error("cannot resolve conflict")
if path == nil {
r.completed = append(r.completed, newFSReplicationPermanentError(fs, conflict))
fsreplbuilder := buildNewFSReplication(fs)
if len(path) == 1 {
fsreplbuilder.AddStep(nil, path[0])
} else {
for i := 0; i < len(path)-1; i++ {
fsreplbuilder.AddStep(path[i], path[i+1])
fsrepl := fsreplbuilder.Complete()
switch fsrepl.state {
case FSCompleted:
r.completed = append(r.completed, fsreplbuilder.Complete())
case FSQueued:
r.pending = append(r.pending, fsreplbuilder.Complete())
return done()
func (f *FSReplication) nextStepDate() time.Time {
if f.state != FSQueued {
ct, err := f.pending[0].to.CreationAsTime()
if err != nil {
panic(err) // FIXME
return ct
func (f *FSReplication) takeStep(ctx context.Context, ep EndpointPair) FSReplicationState {
if f.state != FSQueued {
f.state = FSActive
step := f.pending[0]
stepState := step.do(ctx, ep)
switch stepState {
case StepCompleted:
f.pending = f.pending[1:]
f.completed = append(f.completed, step)
if len(f.pending) > 0 {
f.state = FSQueued
} else {
f.state = FSCompleted
case StepRetry:
f.state = FSRetry
f.retryAt = time.Now().Add(10 * time.Second) // FIXME hardcoded constant
case StepPermanentError:
f.state = FSPermanentError
return f.state
func (f *FSReplication) retryNow() {
if f.state != FSRetry {
f.retryAt = time.Time{}
f.state = FSQueued
func (s *FSReplicationStep) do(ctx context.Context, ep EndpointPair) FSReplicationStepState {
fs := s.fsrep.fs
log := getLogger(ctx).
WithField("filesystem", fs.Path).
WithField("step", s.String())
updateStateError := func(err error) FSReplicationStepState {
s.err = err
switch err {
case io.EOF: fallthrough
case io.ErrUnexpectedEOF: fallthrough
case io.ErrClosedPipe:
return StepRetry
if _, ok := err.(net.Error); ok {
return StepRetry
return StepPermanentError
updateStateCompleted := func() FSReplicationStepState {
s.err = nil
s.state = StepCompleted
return s.state
// FIXME refresh fs resume token
fs.ResumeToken = ""
var sr *SendReq
if fs.ResumeToken != "" {
sr = &SendReq{
Filesystem: fs.Path,
ResumeToken: fs.ResumeToken,
} else if s.from == nil {
sr = &SendReq{
Filesystem: fs.Path,
From: s.to.RelName(), // FIXME fix protocol to use To, like zfs does internally
} else {
sr = &SendReq{
Filesystem: fs.Path,
From: s.from.RelName(),
To: s.to.RelName(),
log.WithField("request", sr).Debug("initiate send request")
sres, sstream, err := ep.Sender().Send(ctx, sr)
if err != nil {
log.WithError(err).Error("send request failed")
return updateStateError(err)
if sstream == nil {
err := errors.New("send request did not return a stream, broken endpoint implementation")
return updateStateError(err)
rr := &ReceiveReq{
Filesystem: fs.Path,
ClearResumeToken: !sres.UsedResumeToken,
log.WithField("request", rr).Debug("initiate receive request")
err = ep.Receiver().Receive(ctx, rr, sstream)
if err != nil {
log.WithError(err).Error("receive request failed (might also be error on sender)")
// This failure could be due to
// - an unexpected exit of ZFS on the sending side
// - an unexpected exit of ZFS on the receiving side
// - a connectivity issue
return updateStateError(err)
log.Info("receive finished")
return updateStateCompleted()
func (s *FSReplicationStep) String() string {
if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send
return fmt.Sprintf("%s%s (full)", s.fsrep.fs.Path, s.to.RelName())
} else {
return fmt.Sprintf("%s(%s => %s)", s.fsrep.fs.Path, s.from.RelName(), s.to.RelName())
Normal file
Normal file
@ -0,0 +1,137 @@
package replication
import (
type ReplicationEndpoint interface {
// Does not include placeholder filesystems
ListFilesystems(ctx context.Context) ([]*Filesystem, error)
ListFilesystemVersions(ctx context.Context, fs string) ([]*FilesystemVersion, error) // fix depS
Send(ctx context.Context, r *SendReq) (*SendRes, io.ReadCloser, error)
Receive(ctx context.Context, r *ReceiveReq, sendStream io.ReadCloser) error
type FilteredError struct{ fs string }
func NewFilteredError(fs string) FilteredError {
return FilteredError{fs}
func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs }
type ReplicationMode int
const (
ReplicationModePull ReplicationMode = iota
type EndpointPair struct {
a, b ReplicationEndpoint
m ReplicationMode
func NewEndpointPairPull(sender, receiver ReplicationEndpoint) EndpointPair {
return EndpointPair{sender, receiver, ReplicationModePull}
func NewEndpointPairPush(sender, receiver ReplicationEndpoint) EndpointPair {
return EndpointPair{receiver, sender, ReplicationModePush}
func (p EndpointPair) Sender() ReplicationEndpoint {
switch p.m {
case ReplicationModePull:
return p.a
case ReplicationModePush:
return p.b
panic("should not be reached")
return nil
func (p EndpointPair) Receiver() ReplicationEndpoint {
switch p.m {
case ReplicationModePull:
return p.b
case ReplicationModePush:
return p.a
panic("should not be reached")
return nil
func (p EndpointPair) Mode() ReplicationMode {
return p.m
type contextKey int
const (
contextKeyLog contextKey = iota
//type Logger interface {
// Infof(fmt string, args ...interface{})
// Errorf(fmt string, args ...interface{})
//var _ Logger = nullLogger{}
//type nullLogger struct{}
//func (nullLogger) Infof(fmt string, args ...interface{}) {}
//func (nullLogger) Errorf(fmt string, args ...interface{}) {}
type Logger = logger.Logger
func ContextWithLogger(ctx context.Context, l Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, l)
func getLogger(ctx context.Context) Logger {
l, ok := ctx.Value(contextKeyLog).(Logger)
if !ok {
l = logger.NewNullLogger()
return l
func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) {
if noCommonAncestor, ok := conflict.(*ConflictNoCommonAncestor); ok {
if len(noCommonAncestor.SortedReceiverVersions) == 0 {
// FIXME hard-coded replication policy: most recent
// snapshot as source
var mostRecentSnap *FilesystemVersion
for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- {
if noCommonAncestor.SortedSenderVersions[n].Type == FilesystemVersion_Snapshot {
mostRecentSnap = noCommonAncestor.SortedSenderVersions[n]
if mostRecentSnap == nil {
return nil, "no snapshots available on sender side"
return []*FilesystemVersion{mostRecentSnap}, fmt.Sprintf("start replication at most recent snapshot %s", mostRecentSnap.RelName())
return nil, "no automated way to handle conflict type"
// Replicate replicates filesystems from ep.Sender() to ep.Receiver().
// All filesystems presented by the sending side are replicated,
// unless the receiver rejects a Receive request with a *FilteredError.
// If an error occurs when replicating a filesystem, that error is logged to the logger in ctx.
// Replicate continues with the replication of the remaining file systems.
// Depending on the type of error, failed replications are retried in an unspecified order (currently FIFO).
func Replicate(ctx context.Context, ep EndpointPair, retryNow chan struct{}) {
r := Replication{}
r.Drive(ctx, ep, retryNow)
@ -83,18 +83,18 @@ func (m *MockIncrementalPathRecorder) Finished() bool {
type testLog struct {
t *testing.T
var _ replication.Logger = testLog{}
func (t testLog) Infof(fmt string, args ...interface{}) {
t.t.Logf(fmt, args)
func (t testLog) Errorf(fmt string, args ...interface{}) {
t.t.Logf(fmt, args)
//type testLog struct {
// t *testing.T
//var _ replication.Logger = testLog{}
//func (t testLog) Infof(fmt string, args ...interface{}) {
// t.t.Logf(fmt, args)
//func (t testLog) Errorf(fmt string, args ...interface{}) {
// t.t.Logf(fmt, args)
//func TestIncrementalPathReplicator_Replicate(t *testing.T) {
Normal file
Normal file
@ -0,0 +1,16 @@
// Code generated by "stringer -type=ReplicationState"; DO NOT EDIT.
package replication
import "strconv"
const _ReplicationState_name = "PlanningPlanningErrorWorkingWorkingWaitCompletedContextDone"
var _ReplicationState_index = [...]uint8{0, 8, 21, 28, 39, 48, 59}
func (i ReplicationState) String() string {
if i < 0 || i >= ReplicationState(len(_ReplicationState_index)-1) {
return "ReplicationState(" + strconv.FormatInt(int64(i), 10) + ")"
return _ReplicationState_name[_ReplicationState_index[i]:_ReplicationState_index[i+1]]
@ -1,472 +0,0 @@
package replication
import (
type ReplicationEndpoint interface {
// Does not include placeholder filesystems
ListFilesystems(ctx context.Context) ([]*Filesystem, error)
ListFilesystemVersions(ctx context.Context, fs string) ([]*FilesystemVersion, error) // fix depS
Send(ctx context.Context, r *SendReq) (*SendRes, io.ReadCloser, error)
Receive(ctx context.Context, r *ReceiveReq, sendStream io.ReadCloser) error
type FilteredError struct{ fs string }
func NewFilteredError(fs string) FilteredError {
return FilteredError{fs}
func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs }
type ReplicationMode int
const (
ReplicationModePull ReplicationMode = iota
type EndpointPair struct {
a, b ReplicationEndpoint
m ReplicationMode
func NewEndpointPairPull(sender, receiver ReplicationEndpoint) EndpointPair {
return EndpointPair{sender, receiver, ReplicationModePull}
func NewEndpointPairPush(sender, receiver ReplicationEndpoint) EndpointPair {
return EndpointPair{receiver, sender, ReplicationModePush}
func (p EndpointPair) Sender() ReplicationEndpoint {
switch p.m {
case ReplicationModePull:
return p.a
case ReplicationModePush:
return p.b
panic("should not be reached")
return nil
func (p EndpointPair) Receiver() ReplicationEndpoint {
switch p.m {
case ReplicationModePull:
return p.b
case ReplicationModePush:
return p.a
panic("should not be reached")
return nil
func (p EndpointPair) Mode() ReplicationMode {
return p.m
type contextKey int
const (
contextKeyLog contextKey = iota
//type Logger interface {
// Infof(fmt string, args ...interface{})
// Errorf(fmt string, args ...interface{})
//var _ Logger = nullLogger{}
//type nullLogger struct{}
//func (nullLogger) Infof(fmt string, args ...interface{}) {}
//func (nullLogger) Errorf(fmt string, args ...interface{}) {}
type Logger = logger.Logger
func ContextWithLogger(ctx context.Context, l Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, l)
func getLogger(ctx context.Context) Logger {
l, ok := ctx.Value(contextKeyLog).(Logger)
if !ok {
l = logger.NewNullLogger()
return l
type replicationStep struct {
from, to *FilesystemVersion
fswork *replicateFSWork
func (s *replicationStep) String() string {
if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send
return fmt.Sprintf("%s%s (full)", s.fswork.fs.Path, s.to.RelName())
} else {
return fmt.Sprintf("%s(%s => %s)", s.fswork.fs.Path, s.from.RelName(), s.to.RelName())
func newReplicationStep(from, to *FilesystemVersion) *replicationStep {
return &replicationStep{from: from, to: to}
type replicateFSWork struct {
fs *Filesystem
steps []*replicationStep
currentStep int
errorCount int
func newReplicateFSWork(fs *Filesystem) *replicateFSWork {
if fs == nil {
panic("implementation error")
return &replicateFSWork{
fs: fs,
steps: make([]*replicationStep, 0),
func newReplicateFSWorkWithConflict(fs *Filesystem, conflict error) *replicateFSWork {
// FIXME ignore conflict for now, but will be useful later when we make the replicationPlan exportable
return &replicateFSWork{
fs: fs,
steps: make([]*replicationStep, 0),
func (r *replicateFSWork) AddStep(step *replicationStep) {
if step == nil {
panic("implementation error")
if step.fswork != nil {
panic("implementation error")
step.fswork = r
r.steps = append(r.steps, step)
func (w *replicateFSWork) CurrentStepDate() time.Time {
if len(w.steps) == 0 {
return time.Time{}
toTime, err := w.steps[w.currentStep].to.CreationAsTime()
if err != nil {
panic(err) // implementation inconsistent: should not admit invalid FilesystemVersion objects
return toTime
func (w *replicateFSWork) CurrentStep() *replicationStep {
if w.currentStep >= len(w.steps) {
return nil
return w.steps[w.currentStep]
func (w *replicateFSWork) CompleteStep() {
type replicationPlan struct {
fsws []*replicateFSWork
func newReplicationPlan() *replicationPlan {
return &replicationPlan{
fsws: make([]*replicateFSWork, 0),
func (p *replicationPlan) addWork(work *replicateFSWork) {
p.fsws = append(p.fsws, work)
func (p *replicationPlan) executeOldestFirst(ctx context.Context, doStep func(fs *Filesystem, from, to *FilesystemVersion) tryRes) {
log := getLogger(ctx)
for {
select {
case <-ctx.Done():
log.WithError(ctx.Err()).Info("aborting replication due to context error")
// FIXME poor man's nested priority queue
pending := make([]*replicateFSWork, 0, len(p.fsws))
for _, fsw := range p.fsws {
if fsw.CurrentStep() != nil {
pending = append(pending, fsw)
sort.Slice(pending, func(i, j int) bool {
if pending[i].errorCount == pending[j].errorCount {
return pending[i].CurrentStepDate().Before(pending[j].CurrentStepDate())
return pending[i].errorCount < pending[j].errorCount
// pending is now sorted ascending by errorCount,CurrentStep().Creation
if len(pending) == 0 {
log.Info("replication complete")
fsw := pending[0]
step := fsw.CurrentStep()
if step == nil {
panic("implementation error")
log.WithField("step", step).Info("begin replication step")
res := doStep(step.fswork.fs, step.from, step.to)
if res.done {
log.Info("replication step successful")
fsw.errorCount = 0
} else {
log.Error("replication step failed, queuing for retry result")
func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) {
if noCommonAncestor, ok := conflict.(*ConflictNoCommonAncestor); ok {
if len(noCommonAncestor.SortedReceiverVersions) == 0 {
// FIXME hard-coded replication policy: most recent
// snapshot as source
var mostRecentSnap *FilesystemVersion
for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- {
if noCommonAncestor.SortedSenderVersions[n].Type == FilesystemVersion_Snapshot {
mostRecentSnap = noCommonAncestor.SortedSenderVersions[n]
if mostRecentSnap == nil {
return nil, "no snapshots available on sender side"
return []*FilesystemVersion{mostRecentSnap}, fmt.Sprintf("start replication at most recent snapshot %s", mostRecentSnap.RelName())
return nil, "no automated way to handle conflict type"
// Replicate replicates filesystems from ep.Sender() to ep.Receiver().
// All filesystems presented by the sending side are replicated,
// unless the receiver rejects a Receive request with a *FilteredError.
// If an error occurs when replicating a filesystem, that error is logged to the logger in ctx.
// Replicate continues with the replication of the remaining file systems.
// Depending on the type of error, failed replications are retried in an unspecified order (currently FIFO).
func Replicate(ctx context.Context, ep EndpointPair) {
log := getLogger(ctx)
retryPlanTicker := time.NewTicker(15 * time.Second) // FIXME make configurable
defer retryPlanTicker.Stop()
var (
plan *replicationPlan
res tryRes
for {
log.Info("build replication plan")
plan, res = tryBuildReplicationPlan(ctx, ep)
if plan != nil {
log.WithField("result", res).Error("building replication plan failed, wait for retry timer result")
select {
case <-ctx.Done():
log.WithError(ctx.Err()).Info("aborting replication because context is done")
case <-retryPlanTicker.C:
// TODO also accept an external channel that allows us to tick
mainlog := log
plan.executeOldestFirst(ctx, func(fs *Filesystem, from, to *FilesystemVersion) tryRes {
log := mainlog.WithField("filesystem", fs.Path)
// FIXME refresh fs resume token
fs.ResumeToken = ""
var sr *SendReq
if fs.ResumeToken != "" {
sr = &SendReq{
Filesystem: fs.Path,
ResumeToken: fs.ResumeToken,
} else if from == nil {
sr = &SendReq{
Filesystem: fs.Path,
From: to.RelName(), // FIXME fix protocol to use To, like zfs does internally
} else {
sr = &SendReq{
Filesystem: fs.Path,
From: from.RelName(),
To: to.RelName(),
log.WithField("request", sr).Debug("initiate send request")
sres, sstream, err := ep.Sender().Send(ctx, sr)
if err != nil {
log.WithError(err).Error("send request failed")
return tryResFromEndpointError(err)
if sstream == nil {
log.Error("send request did not return a stream, broken endpoint implementation")
return tryRes{unfixable: true}
rr := &ReceiveReq{
Filesystem: fs.Path,
ClearResumeToken: !sres.UsedResumeToken,
log.WithField("request", rr).Debug("initiate receive request")
err = ep.Receiver().Receive(ctx, rr, sstream)
if err != nil {
log.WithError(err).Error("receive request failed (might also be error on sender)")
// This failure could be due to
// - an unexpected exit of ZFS on the sending side
// - an unexpected exit of ZFS on the receiving side
// - a connectivity issue
return tryResFromEndpointError(err)
log.Info("receive finished")
return tryRes{done: true}
type tryRes struct {
done bool
retry bool
unfixable bool
func tryResFromEndpointError(err error) tryRes {
if _, ok := err.(net.Error); ok {
return tryRes{retry: true}
return tryRes{unfixable: true}
func tryBuildReplicationPlan(ctx context.Context, ep EndpointPair) (*replicationPlan, tryRes) {
log := getLogger(ctx)
early := func(err error) (*replicationPlan, tryRes) {
return nil, tryResFromEndpointError(err)
sfss, err := ep.Sender().ListFilesystems(ctx)
if err != nil {
log.WithError(err).Error("error listing sender filesystems")
return early(err)
rfss, err := ep.Receiver().ListFilesystems(ctx)
if err != nil {
log.WithError(err).Error("error listing receiver filesystems")
return early(err)
plan := newReplicationPlan()
mainlog := log
for _, fs := range sfss {
log := mainlog.WithField("filesystem", fs.Path)
log.Info("assessing filesystem")
sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path)
if err != nil {
log.WithError(err).Error("cannot get remote filesystem versions")
return early(err)
if len(sfsvs) <= 1 {
log.Error("sender does not have any versions")
return nil, tryRes{unfixable: true}
receiverFSExists := false
for _, rfs := range rfss {
if rfs.Path == fs.Path {
receiverFSExists = true
var rfsvs []*FilesystemVersion
if receiverFSExists {
rfsvs, err = ep.Receiver().ListFilesystemVersions(ctx, fs.Path)
if err != nil {
if _, ok := err.(FilteredError); ok {
log.Info("receiver ignores filesystem")
log.WithError(err).Error("receiver error")
return early(err)
} else {
rfsvs = []*FilesystemVersion{}
path, conflict := IncrementalPath(rfsvs, sfsvs)
if conflict != nil {
var msg string
path, msg = resolveConflict(conflict) // no shadowing allowed!
if path != nil {
log.WithField("conflict", conflict).Info("conflict")
log.WithField("resolution", msg).Info("automatically resolved")
} else {
log.WithField("conflict", conflict).Error("conflict")
log.WithField("problem", msg).Error("cannot resolve conflict")
if path == nil {
plan.addWork(newReplicateFSWorkWithConflict(fs, conflict))
w := newReplicateFSWork(fs)
if len(path) == 1 {
step := newReplicationStep(nil, path[0])
} else {
for i := 0; i < len(path)-1; i++ {
step := newReplicationStep(path[i], path[i+1])
return plan, tryRes{done: true}
Reference in New Issue
Block a user