diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go index d66bdce..f1902e1 100644 --- a/cmd/config_job_local.go +++ b/cmd/config_job_local.go @@ -8,7 +8,7 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/zfs" "sync" - "github.com/zrepl/zrepl/cmd/replication" + "github.com/zrepl/zrepl/cmd/replication.v2" ) type LocalJob struct { @@ -146,7 +146,7 @@ outer: j.mainTask.Log().Debug("replicating from lhs to rhs") j.mainTask.Enter("replicate") - replication.Replicate(ctx, replication.NewEndpointPairPull(sender, receiver)) + replication.Replicate(ctx, replication.NewEndpointPairPull(sender, receiver), nil) // FIXME j.mainTask.Finish() diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index 85685e4..08d5f1e 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -1,6 +1,7 @@ package cmd import ( + "net" "os" "os/signal" "syscall" @@ -12,7 +13,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/cmd/replication" + "github.com/zrepl/zrepl/cmd/replication.v2" ) type PullJob struct { @@ -165,7 +166,10 @@ func (j *PullJob) doRun(ctx context.Context) { ConnConfig: STREAMRPC_CONFIG, } - client, err := streamrpc.NewClient(j.Connect, clientConf) + //client, err := streamrpc.NewClient(j.Connect, clientConf) + client, err := streamrpc.NewClient(&tcpConnecter{net.Dialer{ + Timeout: 10*time.Second, + }}, clientConf) defer client.Close() j.task.Enter("pull") @@ -182,10 +186,26 @@ func (j *PullJob) doRun(ctx context.Context) { return } + usr2 := make(chan os.Signal) + defer close(usr2) + signal.Notify(usr2, syscall.SIGUSR2) + defer signal.Stop(usr2) + retryNow := make(chan struct{}, 1) // buffered so we don't leak the goroutine + go func() { + for { + sig := <-usr2 + if sig != nil { + retryNow <- struct{}{} + } else { + break + } + } + }() + ctx = replication.ContextWithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")}) ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField("subsystem", "rpc.protocol")}) ctx = context.WithValue(ctx, contextKeyLog, j.task.Log().WithField("subsystem", "rpc.endpoint")) - replication.Replicate(ctx, replication.NewEndpointPairPull(sender, puller)) + replication.Replicate(ctx, replication.NewEndpointPairPull(sender, puller), retryNow) client.Close() j.task.Finish() diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go index f6b7851..a3ff349 100644 --- a/cmd/config_job_source.go +++ b/cmd/config_job_source.go @@ -146,7 +146,9 @@ func (j *SourceJob) Pruner(task *Task, side PrunePolicySide, dryRun bool) (p Pru func (j *SourceJob) serve(ctx context.Context, task *Task) { - listener, err := j.Serve.Listen() + //listener, err := j.Serve.Listen() + + listener, err := net.Listen("tcp", ":8888") if err != nil { task.Log().WithError(err).Error("error listening") return diff --git a/cmd/replication.go b/cmd/replication.go index ff18bb0..b7d1c56 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -2,7 +2,7 @@ package cmd import ( "fmt" - "github.com/zrepl/zrepl/cmd/replication" + "github.com/zrepl/zrepl/cmd/replication.v2" "github.com/problame/go-streamrpc" "github.com/zrepl/zrepl/zfs" "io" diff --git a/cmd/replication/diff.go b/cmd/replication.v2/diff.go similarity index 100% rename from cmd/replication/diff.go rename to cmd/replication.v2/diff.go diff --git a/cmd/replication/diff_test.go b/cmd/replication.v2/diff_test.go similarity index 100% rename from cmd/replication/diff_test.go rename to cmd/replication.v2/diff_test.go diff --git a/cmd/replication.v2/fsreplicationstate_string.go b/cmd/replication.v2/fsreplicationstate_string.go new file mode 100644 index 0000000..4859c2a --- /dev/null +++ b/cmd/replication.v2/fsreplicationstate_string.go @@ -0,0 +1,32 @@ +// Code generated by "stringer -type=FSReplicationState"; DO NOT EDIT. + +package replication + +import "strconv" + +const ( + _FSReplicationState_name_0 = "FSQueuedFSActive" + _FSReplicationState_name_1 = "FSRetry" + _FSReplicationState_name_2 = "FSPermanentError" + _FSReplicationState_name_3 = "FSCompleted" +) + +var ( + _FSReplicationState_index_0 = [...]uint8{0, 8, 16} +) + +func (i FSReplicationState) String() string { + switch { + case 1 <= i && i <= 2: + i -= 1 + return _FSReplicationState_name_0[_FSReplicationState_index_0[i]:_FSReplicationState_index_0[i+1]] + case i == 4: + return _FSReplicationState_name_1 + case i == 8: + return _FSReplicationState_name_2 + case i == 16: + return _FSReplicationState_name_3 + default: + return "FSReplicationState(" + strconv.FormatInt(int64(i), 10) + ")" + } +} diff --git a/cmd/replication.v2/fsreplicationstepstate_string.go b/cmd/replication.v2/fsreplicationstepstate_string.go new file mode 100644 index 0000000..6f45363 --- /dev/null +++ b/cmd/replication.v2/fsreplicationstepstate_string.go @@ -0,0 +1,16 @@ +// Code generated by "stringer -type=FSReplicationStepState"; DO NOT EDIT. + +package replication + +import "strconv" + +const _FSReplicationStepState_name = "StepPendingStepActiveStepRetryStepPermanentErrorStepCompleted" + +var _FSReplicationStepState_index = [...]uint8{0, 11, 21, 30, 48, 61} + +func (i FSReplicationStepState) String() string { + if i < 0 || i >= FSReplicationStepState(len(_FSReplicationStepState_index)-1) { + return "FSReplicationStepState(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _FSReplicationStepState_name[_FSReplicationStepState_index[i]:_FSReplicationStepState_index[i+1]] +} diff --git a/cmd/replication/pdu.pb.go b/cmd/replication.v2/pdu.pb.go similarity index 100% rename from cmd/replication/pdu.pb.go rename to cmd/replication.v2/pdu.pb.go diff --git a/cmd/replication/pdu.proto b/cmd/replication.v2/pdu.proto similarity index 100% rename from cmd/replication/pdu.proto rename to cmd/replication.v2/pdu.proto diff --git a/cmd/replication/pdu_extras.go b/cmd/replication.v2/pdu_extras.go similarity index 100% rename from cmd/replication/pdu_extras.go rename to cmd/replication.v2/pdu_extras.go diff --git a/cmd/replication/pdu_test.go b/cmd/replication.v2/pdu_test.go similarity index 100% rename from cmd/replication/pdu_test.go rename to cmd/replication.v2/pdu_test.go diff --git a/cmd/replication.v2/plan.go b/cmd/replication.v2/plan.go new file mode 100644 index 0000000..d0833e2 --- /dev/null +++ b/cmd/replication.v2/plan.go @@ -0,0 +1,474 @@ +package replication + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "sort" + "time" +) + +//go:generate stringer -type=ReplicationState +type ReplicationState int + +const ( + Planning ReplicationState = iota + PlanningError + Working + WorkingWait + Completed + ContextDone +) + +type Replication struct { + state ReplicationState + + // Working / WorkingWait + + pending, completed []*FSReplication + + // PlanningError + planningError error + + // ContextDone + contextError error +} + +type FSReplicationState int + +//go:generate stringer -type=FSReplicationState +const ( + FSQueued FSReplicationState = 1 << iota + FSActive + FSRetry + FSPermanentError + FSCompleted +) + +type FSReplication struct { + state FSReplicationState + fs *Filesystem + permanentError error + retryAt time.Time + completed, pending []*FSReplicationStep +} + +func newFSReplicationPermanentError(fs *Filesystem, err error) *FSReplication { + return &FSReplication{ + state: FSPermanentError, + fs: fs, + permanentError: err, + } +} + +type FSReplicationBuilder struct { + r *FSReplication + steps []*FSReplicationStep +} + +func buildNewFSReplication(fs *Filesystem) *FSReplicationBuilder { + return &FSReplicationBuilder{ + r: &FSReplication{ + fs: fs, + pending: make([]*FSReplicationStep, 0), + }, + } +} + +func (b *FSReplicationBuilder) AddStep(from, to *FilesystemVersion) *FSReplication { + step := &FSReplicationStep{ + state: StepPending, + fsrep: b.r, + from: from, + to: to, + } + b.r.pending = append(b.r.pending, step) + return b.r +} + +func (b *FSReplicationBuilder) Complete() *FSReplication { + if len(b.r.pending) > 0 { + b.r.state = FSQueued + } else { + b.r.state = FSCompleted + } + r := b.r + return r +} + +//go:generate stringer -type=FSReplicationStepState +type FSReplicationStepState int + +const ( + StepPending FSReplicationStepState = iota + StepActive + StepRetry + StepPermanentError + StepCompleted +) + +type FSReplicationStep struct { + state FSReplicationStepState + from, to *FilesystemVersion + fsrep *FSReplication + + // both retry and permanent error + err error +} + +func (r *Replication) Drive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { + for !(r.state == Completed || r.state == ContextDone) { + pre := r.state + preTime := time.Now() + r.doDrive(ctx, ep, retryNow) + delta := time.Now().Sub(preTime) + post := r.state + getLogger(ctx). + WithField("transition", fmt.Sprintf("%s => %s", pre, post)). + WithField("duration", delta). + Debug("state transition") + } +} + +func (r *Replication) doDrive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { + + switch r.state { + + case Planning: + r.tryBuildPlan(ctx, ep) + + case PlanningError: + w := time.NewTimer(10 * time.Second) // FIXME constant make configurable + defer w.Stop() + select { + case <-ctx.Done(): + r.state = ContextDone + r.contextError = ctx.Err() + case <-retryNow: + r.state = Planning + r.planningError = nil + case <-w.C: + r.state = Planning + r.planningError = nil + } + + case Working: + + if len(r.pending) == 0 { + r.state = Completed + return + } + + sort.Slice(r.pending, func(i, j int) bool { + a, b := r.pending[i], r.pending[j] + statePrio := func(x *FSReplication) int { + if !(x.state == FSQueued || x.state == FSRetry) { + panic(x) + } + if x.state == FSQueued { + return 0 + } else { + return 1 + } + } + aprio, bprio := statePrio(a), statePrio(b) + if aprio != bprio { + return aprio < bprio + } + // now we know they are the same state + if a.state == FSQueued { + return a.nextStepDate().Before(b.nextStepDate()) + } + if a.state == FSRetry { + return a.retryAt.Before(b.retryAt) + } + panic("should not be reached") + }) + + fsrep := r.pending[0] + + if fsrep.state == FSRetry { + r.state = WorkingWait + return + } + if fsrep.state != FSQueued { + panic(fsrep) + } + + fsState := fsrep.takeStep(ctx, ep) + if fsState&(FSPermanentError|FSCompleted) != 0 { + r.pending = r.pending[1:] + r.completed = append(r.completed, fsrep) + } + + case WorkingWait: + fsrep := r.pending[0] + w := time.NewTimer(fsrep.retryAt.Sub(time.Now())) + defer w.Stop() + select { + case <-ctx.Done(): + r.state = ContextDone + r.contextError = ctx.Err() + case <-retryNow: + for _, fsr := range r.pending { + fsr.retryNow() + } + r.state = Working + case <-w.C: + fsrep.retryNow() // avoid timer jitter + r.state = Working + } + } +} + +func (r *Replication) tryBuildPlan(ctx context.Context, ep EndpointPair) ReplicationState { + + log := getLogger(ctx) + + planningError := func(err error) ReplicationState { + r.state = PlanningError + r.planningError = err + return r.state + } + done := func() ReplicationState { + r.state = Working + r.planningError = nil + return r.state + } + + sfss, err := ep.Sender().ListFilesystems(ctx) + if err != nil { + log.WithError(err).Error("error listing sender filesystems") + return planningError(err) + } + + rfss, err := ep.Receiver().ListFilesystems(ctx) + if err != nil { + log.WithError(err).Error("error listing receiver filesystems") + return planningError(err) + } + + r.pending = make([]*FSReplication, 0, len(sfss)) + r.completed = make([]*FSReplication, 0, len(sfss)) + mainlog := log + for _, fs := range sfss { + + log := mainlog.WithField("filesystem", fs.Path) + + log.Info("assessing filesystem") + + sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path) + if err != nil { + log.WithError(err).Error("cannot get remote filesystem versions") + return planningError(err) + } + + if len(sfsvs) <= 1 { + err := errors.New("sender does not have any versions") + log.Error(err.Error()) + r.completed = append(r.completed, newFSReplicationPermanentError(fs, err)) + continue + } + + receiverFSExists := false + for _, rfs := range rfss { + if rfs.Path == fs.Path { + receiverFSExists = true + } + } + + var rfsvs []*FilesystemVersion + if receiverFSExists { + rfsvs, err = ep.Receiver().ListFilesystemVersions(ctx, fs.Path) + if err != nil { + if _, ok := err.(FilteredError); ok { + log.Info("receiver ignores filesystem") + continue + } + log.WithError(err).Error("receiver error") + return planningError(err) + } + } else { + rfsvs = []*FilesystemVersion{} + } + + path, conflict := IncrementalPath(rfsvs, sfsvs) + if conflict != nil { + var msg string + path, msg = resolveConflict(conflict) // no shadowing allowed! + if path != nil { + log.WithField("conflict", conflict).Info("conflict") + log.WithField("resolution", msg).Info("automatically resolved") + } else { + log.WithField("conflict", conflict).Error("conflict") + log.WithField("problem", msg).Error("cannot resolve conflict") + } + } + if path == nil { + r.completed = append(r.completed, newFSReplicationPermanentError(fs, conflict)) + continue + } + + fsreplbuilder := buildNewFSReplication(fs) + if len(path) == 1 { + fsreplbuilder.AddStep(nil, path[0]) + } else { + for i := 0; i < len(path)-1; i++ { + fsreplbuilder.AddStep(path[i], path[i+1]) + } + } + fsrepl := fsreplbuilder.Complete() + switch fsrepl.state { + case FSCompleted: + r.completed = append(r.completed, fsreplbuilder.Complete()) + case FSQueued: + r.pending = append(r.pending, fsreplbuilder.Complete()) + default: + panic(fsrepl) + } + + } + + return done() +} + +func (f *FSReplication) nextStepDate() time.Time { + if f.state != FSQueued { + panic(f) + } + ct, err := f.pending[0].to.CreationAsTime() + if err != nil { + panic(err) // FIXME + } + return ct +} + +func (f *FSReplication) takeStep(ctx context.Context, ep EndpointPair) FSReplicationState { + if f.state != FSQueued { + panic(f) + } + + f.state = FSActive + step := f.pending[0] + stepState := step.do(ctx, ep) + + switch stepState { + case StepCompleted: + f.pending = f.pending[1:] + f.completed = append(f.completed, step) + if len(f.pending) > 0 { + f.state = FSQueued + } else { + f.state = FSCompleted + } + + case StepRetry: + f.state = FSRetry + f.retryAt = time.Now().Add(10 * time.Second) // FIXME hardcoded constant + + case StepPermanentError: + f.state = FSPermanentError + + } + return f.state +} + +func (f *FSReplication) retryNow() { + if f.state != FSRetry { + panic(f) + } + f.retryAt = time.Time{} + f.state = FSQueued +} + +func (s *FSReplicationStep) do(ctx context.Context, ep EndpointPair) FSReplicationStepState { + + fs := s.fsrep.fs + + log := getLogger(ctx). + WithField("filesystem", fs.Path). + WithField("step", s.String()) + + updateStateError := func(err error) FSReplicationStepState { + s.err = err + switch err { + case io.EOF: fallthrough + case io.ErrUnexpectedEOF: fallthrough + case io.ErrClosedPipe: + return StepRetry + } + if _, ok := err.(net.Error); ok { + return StepRetry + } + return StepPermanentError + } + + updateStateCompleted := func() FSReplicationStepState { + s.err = nil + s.state = StepCompleted + return s.state + } + + // FIXME refresh fs resume token + fs.ResumeToken = "" + + var sr *SendReq + if fs.ResumeToken != "" { + sr = &SendReq{ + Filesystem: fs.Path, + ResumeToken: fs.ResumeToken, + } + } else if s.from == nil { + sr = &SendReq{ + Filesystem: fs.Path, + From: s.to.RelName(), // FIXME fix protocol to use To, like zfs does internally + } + } else { + sr = &SendReq{ + Filesystem: fs.Path, + From: s.from.RelName(), + To: s.to.RelName(), + } + } + + log.WithField("request", sr).Debug("initiate send request") + sres, sstream, err := ep.Sender().Send(ctx, sr) + if err != nil { + log.WithError(err).Error("send request failed") + return updateStateError(err) + } + if sstream == nil { + err := errors.New("send request did not return a stream, broken endpoint implementation") + return updateStateError(err) + } + + rr := &ReceiveReq{ + Filesystem: fs.Path, + ClearResumeToken: !sres.UsedResumeToken, + } + log.WithField("request", rr).Debug("initiate receive request") + err = ep.Receiver().Receive(ctx, rr, sstream) + if err != nil { + log.WithError(err).Error("receive request failed (might also be error on sender)") + sstream.Close() + // This failure could be due to + // - an unexpected exit of ZFS on the sending side + // - an unexpected exit of ZFS on the receiving side + // - a connectivity issue + return updateStateError(err) + } + log.Info("receive finished") + return updateStateCompleted() + +} + +func (s *FSReplicationStep) String() string { + if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send + return fmt.Sprintf("%s%s (full)", s.fsrep.fs.Path, s.to.RelName()) + } else { + return fmt.Sprintf("%s(%s => %s)", s.fsrep.fs.Path, s.from.RelName(), s.to.RelName()) + } +} + diff --git a/cmd/replication.v2/replication.go b/cmd/replication.v2/replication.go new file mode 100644 index 0000000..8d567fc --- /dev/null +++ b/cmd/replication.v2/replication.go @@ -0,0 +1,137 @@ +package replication + +import ( + "context" + "fmt" + "github.com/zrepl/zrepl/logger" + "io" +) + +type ReplicationEndpoint interface { + // Does not include placeholder filesystems + ListFilesystems(ctx context.Context) ([]*Filesystem, error) + ListFilesystemVersions(ctx context.Context, fs string) ([]*FilesystemVersion, error) // fix depS + Send(ctx context.Context, r *SendReq) (*SendRes, io.ReadCloser, error) + Receive(ctx context.Context, r *ReceiveReq, sendStream io.ReadCloser) error +} + +type FilteredError struct{ fs string } + +func NewFilteredError(fs string) FilteredError { + return FilteredError{fs} +} + +func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs } + +type ReplicationMode int + +const ( + ReplicationModePull ReplicationMode = iota + ReplicationModePush +) + +type EndpointPair struct { + a, b ReplicationEndpoint + m ReplicationMode +} + +func NewEndpointPairPull(sender, receiver ReplicationEndpoint) EndpointPair { + return EndpointPair{sender, receiver, ReplicationModePull} +} + +func NewEndpointPairPush(sender, receiver ReplicationEndpoint) EndpointPair { + return EndpointPair{receiver, sender, ReplicationModePush} +} + +func (p EndpointPair) Sender() ReplicationEndpoint { + switch p.m { + case ReplicationModePull: + return p.a + case ReplicationModePush: + return p.b + } + panic("should not be reached") + return nil +} + +func (p EndpointPair) Receiver() ReplicationEndpoint { + switch p.m { + case ReplicationModePull: + return p.b + case ReplicationModePush: + return p.a + } + panic("should not be reached") + return nil +} + +func (p EndpointPair) Mode() ReplicationMode { + return p.m +} + +type contextKey int + +const ( + contextKeyLog contextKey = iota +) + +//type Logger interface { +// Infof(fmt string, args ...interface{}) +// Errorf(fmt string, args ...interface{}) +//} + +//var _ Logger = nullLogger{} + +//type nullLogger struct{} +// +//func (nullLogger) Infof(fmt string, args ...interface{}) {} +//func (nullLogger) Errorf(fmt string, args ...interface{}) {} + +type Logger = logger.Logger + +func ContextWithLogger(ctx context.Context, l Logger) context.Context { + return context.WithValue(ctx, contextKeyLog, l) +} + +func getLogger(ctx context.Context) Logger { + l, ok := ctx.Value(contextKeyLog).(Logger) + if !ok { + l = logger.NewNullLogger() + } + return l +} + +func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) { + if noCommonAncestor, ok := conflict.(*ConflictNoCommonAncestor); ok { + if len(noCommonAncestor.SortedReceiverVersions) == 0 { + // FIXME hard-coded replication policy: most recent + // snapshot as source + var mostRecentSnap *FilesystemVersion + for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- { + if noCommonAncestor.SortedSenderVersions[n].Type == FilesystemVersion_Snapshot { + mostRecentSnap = noCommonAncestor.SortedSenderVersions[n] + break + } + } + if mostRecentSnap == nil { + return nil, "no snapshots available on sender side" + } + return []*FilesystemVersion{mostRecentSnap}, fmt.Sprintf("start replication at most recent snapshot %s", mostRecentSnap.RelName()) + } + } + return nil, "no automated way to handle conflict type" +} + +// Replicate replicates filesystems from ep.Sender() to ep.Receiver(). +// +// All filesystems presented by the sending side are replicated, +// unless the receiver rejects a Receive request with a *FilteredError. +// +// If an error occurs when replicating a filesystem, that error is logged to the logger in ctx. +// Replicate continues with the replication of the remaining file systems. +// Depending on the type of error, failed replications are retried in an unspecified order (currently FIFO). +func Replicate(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { + r := Replication{} + r.Drive(ctx, ep, retryNow) +} + diff --git a/cmd/replication/replication_test.go b/cmd/replication.v2/replication_test.go similarity index 93% rename from cmd/replication/replication_test.go rename to cmd/replication.v2/replication_test.go index 36faf55..00b3868 100644 --- a/cmd/replication/replication_test.go +++ b/cmd/replication.v2/replication_test.go @@ -83,18 +83,18 @@ func (m *MockIncrementalPathRecorder) Finished() bool { // //} -type testLog struct { - t *testing.T -} - -var _ replication.Logger = testLog{} - -func (t testLog) Infof(fmt string, args ...interface{}) { - t.t.Logf(fmt, args) -} -func (t testLog) Errorf(fmt string, args ...interface{}) { - t.t.Logf(fmt, args) -} +//type testLog struct { +// t *testing.T +//} +// +//var _ replication.Logger = testLog{} +// +//func (t testLog) Infof(fmt string, args ...interface{}) { +// t.t.Logf(fmt, args) +//} +//func (t testLog) Errorf(fmt string, args ...interface{}) { +// t.t.Logf(fmt, args) +//} //func TestIncrementalPathReplicator_Replicate(t *testing.T) { diff --git a/cmd/replication.v2/replicationstate_string.go b/cmd/replication.v2/replicationstate_string.go new file mode 100644 index 0000000..7e4c0ac --- /dev/null +++ b/cmd/replication.v2/replicationstate_string.go @@ -0,0 +1,16 @@ +// Code generated by "stringer -type=ReplicationState"; DO NOT EDIT. + +package replication + +import "strconv" + +const _ReplicationState_name = "PlanningPlanningErrorWorkingWorkingWaitCompletedContextDone" + +var _ReplicationState_index = [...]uint8{0, 8, 21, 28, 39, 48, 59} + +func (i ReplicationState) String() string { + if i < 0 || i >= ReplicationState(len(_ReplicationState_index)-1) { + return "ReplicationState(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _ReplicationState_name[_ReplicationState_index[i]:_ReplicationState_index[i+1]] +} diff --git a/cmd/replication/replication.go b/cmd/replication/replication.go deleted file mode 100644 index 8f9ea4d..0000000 --- a/cmd/replication/replication.go +++ /dev/null @@ -1,472 +0,0 @@ -package replication - -import ( - "context" - "fmt" - "github.com/zrepl/zrepl/logger" - "io" - "net" - "sort" - "time" -) - -type ReplicationEndpoint interface { - // Does not include placeholder filesystems - ListFilesystems(ctx context.Context) ([]*Filesystem, error) - ListFilesystemVersions(ctx context.Context, fs string) ([]*FilesystemVersion, error) // fix depS - Send(ctx context.Context, r *SendReq) (*SendRes, io.ReadCloser, error) - Receive(ctx context.Context, r *ReceiveReq, sendStream io.ReadCloser) error -} - -type FilteredError struct{ fs string } - -func NewFilteredError(fs string) FilteredError { - return FilteredError{fs} -} - -func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs } - -type ReplicationMode int - -const ( - ReplicationModePull ReplicationMode = iota - ReplicationModePush -) - -type EndpointPair struct { - a, b ReplicationEndpoint - m ReplicationMode -} - -func NewEndpointPairPull(sender, receiver ReplicationEndpoint) EndpointPair { - return EndpointPair{sender, receiver, ReplicationModePull} -} - -func NewEndpointPairPush(sender, receiver ReplicationEndpoint) EndpointPair { - return EndpointPair{receiver, sender, ReplicationModePush} -} - -func (p EndpointPair) Sender() ReplicationEndpoint { - switch p.m { - case ReplicationModePull: - return p.a - case ReplicationModePush: - return p.b - } - panic("should not be reached") - return nil -} - -func (p EndpointPair) Receiver() ReplicationEndpoint { - switch p.m { - case ReplicationModePull: - return p.b - case ReplicationModePush: - return p.a - } - panic("should not be reached") - return nil -} - -func (p EndpointPair) Mode() ReplicationMode { - return p.m -} - -type contextKey int - -const ( - contextKeyLog contextKey = iota -) - -//type Logger interface { -// Infof(fmt string, args ...interface{}) -// Errorf(fmt string, args ...interface{}) -//} - -//var _ Logger = nullLogger{} - -//type nullLogger struct{} -// -//func (nullLogger) Infof(fmt string, args ...interface{}) {} -//func (nullLogger) Errorf(fmt string, args ...interface{}) {} - -type Logger = logger.Logger - -func ContextWithLogger(ctx context.Context, l Logger) context.Context { - return context.WithValue(ctx, contextKeyLog, l) -} - -func getLogger(ctx context.Context) Logger { - l, ok := ctx.Value(contextKeyLog).(Logger) - if !ok { - l = logger.NewNullLogger() - } - return l -} - -type replicationStep struct { - from, to *FilesystemVersion - fswork *replicateFSWork -} - -func (s *replicationStep) String() string { - if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send - return fmt.Sprintf("%s%s (full)", s.fswork.fs.Path, s.to.RelName()) - } else { - return fmt.Sprintf("%s(%s => %s)", s.fswork.fs.Path, s.from.RelName(), s.to.RelName()) - } -} - -func newReplicationStep(from, to *FilesystemVersion) *replicationStep { - return &replicationStep{from: from, to: to} -} - -type replicateFSWork struct { - fs *Filesystem - steps []*replicationStep - currentStep int - errorCount int -} - -func newReplicateFSWork(fs *Filesystem) *replicateFSWork { - if fs == nil { - panic("implementation error") - } - return &replicateFSWork{ - fs: fs, - steps: make([]*replicationStep, 0), - } -} - -func newReplicateFSWorkWithConflict(fs *Filesystem, conflict error) *replicateFSWork { - // FIXME ignore conflict for now, but will be useful later when we make the replicationPlan exportable - return &replicateFSWork{ - fs: fs, - steps: make([]*replicationStep, 0), - } -} - -func (r *replicateFSWork) AddStep(step *replicationStep) { - if step == nil { - panic("implementation error") - } - if step.fswork != nil { - panic("implementation error") - } - step.fswork = r - r.steps = append(r.steps, step) -} - -func (w *replicateFSWork) CurrentStepDate() time.Time { - if len(w.steps) == 0 { - return time.Time{} - } - toTime, err := w.steps[w.currentStep].to.CreationAsTime() - if err != nil { - panic(err) // implementation inconsistent: should not admit invalid FilesystemVersion objects - } - return toTime -} - -func (w *replicateFSWork) CurrentStep() *replicationStep { - if w.currentStep >= len(w.steps) { - return nil - } - return w.steps[w.currentStep] -} - -func (w *replicateFSWork) CompleteStep() { - w.currentStep++ -} - -type replicationPlan struct { - fsws []*replicateFSWork -} - -func newReplicationPlan() *replicationPlan { - return &replicationPlan{ - fsws: make([]*replicateFSWork, 0), - } -} - -func (p *replicationPlan) addWork(work *replicateFSWork) { - p.fsws = append(p.fsws, work) -} - -func (p *replicationPlan) executeOldestFirst(ctx context.Context, doStep func(fs *Filesystem, from, to *FilesystemVersion) tryRes) { - log := getLogger(ctx) - - for { - select { - case <-ctx.Done(): - log.WithError(ctx.Err()).Info("aborting replication due to context error") - return - default: - } - - // FIXME poor man's nested priority queue - pending := make([]*replicateFSWork, 0, len(p.fsws)) - for _, fsw := range p.fsws { - if fsw.CurrentStep() != nil { - pending = append(pending, fsw) - } - } - sort.Slice(pending, func(i, j int) bool { - if pending[i].errorCount == pending[j].errorCount { - return pending[i].CurrentStepDate().Before(pending[j].CurrentStepDate()) - } - return pending[i].errorCount < pending[j].errorCount - }) - // pending is now sorted ascending by errorCount,CurrentStep().Creation - - if len(pending) == 0 { - log.Info("replication complete") - return - } - - fsw := pending[0] - step := fsw.CurrentStep() - if step == nil { - panic("implementation error") - } - - log.WithField("step", step).Info("begin replication step") - res := doStep(step.fswork.fs, step.from, step.to) - - if res.done { - log.Info("replication step successful") - fsw.errorCount = 0 - fsw.CompleteStep() - } else { - log.Error("replication step failed, queuing for retry result") - fsw.errorCount++ - } - - } - -} - -func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) { - if noCommonAncestor, ok := conflict.(*ConflictNoCommonAncestor); ok { - if len(noCommonAncestor.SortedReceiverVersions) == 0 { - // FIXME hard-coded replication policy: most recent - // snapshot as source - var mostRecentSnap *FilesystemVersion - for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- { - if noCommonAncestor.SortedSenderVersions[n].Type == FilesystemVersion_Snapshot { - mostRecentSnap = noCommonAncestor.SortedSenderVersions[n] - break - } - } - if mostRecentSnap == nil { - return nil, "no snapshots available on sender side" - } - return []*FilesystemVersion{mostRecentSnap}, fmt.Sprintf("start replication at most recent snapshot %s", mostRecentSnap.RelName()) - } - } - return nil, "no automated way to handle conflict type" -} - -// Replicate replicates filesystems from ep.Sender() to ep.Receiver(). -// -// All filesystems presented by the sending side are replicated, -// unless the receiver rejects a Receive request with a *FilteredError. -// -// If an error occurs when replicating a filesystem, that error is logged to the logger in ctx. -// Replicate continues with the replication of the remaining file systems. -// Depending on the type of error, failed replications are retried in an unspecified order (currently FIFO). -func Replicate(ctx context.Context, ep EndpointPair) { - - log := getLogger(ctx) - - retryPlanTicker := time.NewTicker(15 * time.Second) // FIXME make configurable - defer retryPlanTicker.Stop() - - var ( - plan *replicationPlan - res tryRes - ) - for { - log.Info("build replication plan") - plan, res = tryBuildReplicationPlan(ctx, ep) - if plan != nil { - break - } - log.WithField("result", res).Error("building replication plan failed, wait for retry timer result") - select { - case <-ctx.Done(): - log.WithError(ctx.Err()).Info("aborting replication because context is done") - return - case <-retryPlanTicker.C: - // TODO also accept an external channel that allows us to tick - } - } - retryPlanTicker.Stop() - - mainlog := log - plan.executeOldestFirst(ctx, func(fs *Filesystem, from, to *FilesystemVersion) tryRes { - - log := mainlog.WithField("filesystem", fs.Path) - - // FIXME refresh fs resume token - fs.ResumeToken = "" - - var sr *SendReq - if fs.ResumeToken != "" { - sr = &SendReq{ - Filesystem: fs.Path, - ResumeToken: fs.ResumeToken, - } - } else if from == nil { - sr = &SendReq{ - Filesystem: fs.Path, - From: to.RelName(), // FIXME fix protocol to use To, like zfs does internally - } - } else { - sr = &SendReq{ - Filesystem: fs.Path, - From: from.RelName(), - To: to.RelName(), - } - } - - log.WithField("request", sr).Debug("initiate send request") - sres, sstream, err := ep.Sender().Send(ctx, sr) - if err != nil { - log.WithError(err).Error("send request failed") - return tryResFromEndpointError(err) - } - if sstream == nil { - log.Error("send request did not return a stream, broken endpoint implementation") - return tryRes{unfixable: true} - } - - rr := &ReceiveReq{ - Filesystem: fs.Path, - ClearResumeToken: !sres.UsedResumeToken, - } - log.WithField("request", rr).Debug("initiate receive request") - err = ep.Receiver().Receive(ctx, rr, sstream) - if err != nil { - log.WithError(err).Error("receive request failed (might also be error on sender)") - sstream.Close() - // This failure could be due to - // - an unexpected exit of ZFS on the sending side - // - an unexpected exit of ZFS on the receiving side - // - a connectivity issue - return tryResFromEndpointError(err) - } - log.Info("receive finished") - return tryRes{done: true} - - }) - -} - -type tryRes struct { - done bool - retry bool - unfixable bool -} - -func tryResFromEndpointError(err error) tryRes { - if _, ok := err.(net.Error); ok { - return tryRes{retry: true} - } - return tryRes{unfixable: true} -} - -func tryBuildReplicationPlan(ctx context.Context, ep EndpointPair) (*replicationPlan, tryRes) { - - log := getLogger(ctx) - - early := func(err error) (*replicationPlan, tryRes) { - return nil, tryResFromEndpointError(err) - } - - sfss, err := ep.Sender().ListFilesystems(ctx) - if err != nil { - log.WithError(err).Error("error listing sender filesystems") - return early(err) - } - - rfss, err := ep.Receiver().ListFilesystems(ctx) - if err != nil { - log.WithError(err).Error("error listing receiver filesystems") - return early(err) - } - - plan := newReplicationPlan() - mainlog := log - for _, fs := range sfss { - - log := mainlog.WithField("filesystem", fs.Path) - - log.Info("assessing filesystem") - - sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path) - if err != nil { - log.WithError(err).Error("cannot get remote filesystem versions") - return early(err) - } - - if len(sfsvs) <= 1 { - log.Error("sender does not have any versions") - return nil, tryRes{unfixable: true} - } - - receiverFSExists := false - for _, rfs := range rfss { - if rfs.Path == fs.Path { - receiverFSExists = true - } - } - - var rfsvs []*FilesystemVersion - if receiverFSExists { - rfsvs, err = ep.Receiver().ListFilesystemVersions(ctx, fs.Path) - if err != nil { - if _, ok := err.(FilteredError); ok { - log.Info("receiver ignores filesystem") - continue - } - log.WithError(err).Error("receiver error") - return early(err) - } - } else { - rfsvs = []*FilesystemVersion{} - } - - path, conflict := IncrementalPath(rfsvs, sfsvs) - if conflict != nil { - var msg string - path, msg = resolveConflict(conflict) // no shadowing allowed! - if path != nil { - log.WithField("conflict", conflict).Info("conflict") - log.WithField("resolution", msg).Info("automatically resolved") - } else { - log.WithField("conflict", conflict).Error("conflict") - log.WithField("problem", msg).Error("cannot resolve conflict") - } - } - if path == nil { - plan.addWork(newReplicateFSWorkWithConflict(fs, conflict)) - continue - } - - w := newReplicateFSWork(fs) - if len(path) == 1 { - step := newReplicationStep(nil, path[0]) - w.AddStep(step) - } else { - for i := 0; i < len(path)-1; i++ { - step := newReplicationStep(path[i], path[i+1]) - w.AddStep(step) - } - } - plan.addWork(w) - - } - - return plan, tryRes{done: true} -}