diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go index bfdb26d..109bdb3 100644 --- a/cmd/config_job_local.go +++ b/cmd/config_job_local.go @@ -147,10 +147,9 @@ outer: { log := pullCtx.Value(contextKeyLog).(Logger) log.Debug("replicating from lhs to rhs") - puller := Puller{j.replTask, local, log, j.Mapping, j.InitialReplPolicy} - if err := puller.Pull(); err != nil { - log.WithError(err).Error("error replicating lhs to rhs") - } + puller := Puller{j.replTask, local, j.Mapping, j.InitialReplPolicy} + puller.Pull() + // use a ctx as soon as Pull gains ctx support select { case <-ctx.Done(): diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index 18ea7ab..09bc59b 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -123,11 +123,10 @@ start: log.Info("starting pull") - pullLog := log.WithField(logTaskField, "pull") - puller := Puller{j.task, client, pullLog, j.Mapping, j.InitialReplPolicy} - if err = puller.Pull(); err != nil { - log.WithError(err).Error("error doing pull") - } + j.task.Enter("pull") + puller := Puller{j.task, client, j.Mapping, j.InitialReplPolicy} + puller.Pull() + j.task.Finish() closeRPCWithTimeout(log, client, time.Second*10, "") diff --git a/cmd/replication.go b/cmd/replication.go index 9825250..d4cb5bc 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -8,7 +8,6 @@ import ( "bytes" "encoding/json" "github.com/zrepl/zrepl/rpc" - "github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/zfs" ) @@ -18,8 +17,6 @@ func (a localPullACL) Filter(p *zfs.DatasetPath) (pass bool, err error) { return true, nil } -const LOCAL_TRANSPORT_IDENTITY string = "local" - const DEFAULT_INITIAL_REPL_POLICY = InitialReplPolicyMostRecent type InitialReplPolicy string @@ -55,60 +52,268 @@ func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration type Puller struct { task *Task Remote rpc.RPCClient - Log Logger Mapping DatasetMapping InitialReplPolicy InitialReplPolicy } -func (pull *Puller) Pull() (err error) { +type remoteLocalMapping struct { + Remote *zfs.DatasetPath + Local *zfs.DatasetPath +} - remote := pull.Remote - log := pull.Log +func (p *Puller) getRemoteFilesystems() (rfs []*zfs.DatasetPath, ok bool) { + p.task.Enter("fetch_remote_fs_list") + defer p.task.Finish() - log.Info("request remote filesystem list") fsr := FilesystemRequest{} - var remoteFilesystems []*zfs.DatasetPath - if err = remote.Call("FilesystemRequest", &fsr, &remoteFilesystems); err != nil { - return + if err := p.Remote.Call("FilesystemRequest", &fsr, &rfs); err != nil { + p.task.Log().WithError(err).Error("cannot fetch remote filesystem list") + return nil, false } + return rfs, true +} - log.Debug("map remote filesystems to local paths and determine order for per-filesystem sync") - type RemoteLocalMapping struct { - Remote *zfs.DatasetPath - Local *zfs.DatasetPath - } - replMapping := make(map[string]RemoteLocalMapping, len(remoteFilesystems)) - localTraversal := zfs.NewDatasetPathForest() +func (p *Puller) buildReplMapping(remoteFilesystems []*zfs.DatasetPath) (replMapping map[string]remoteLocalMapping, ok bool) { + p.task.Enter("build_repl_mapping") + defer p.task.Finish() + + replMapping = make(map[string]remoteLocalMapping, len(remoteFilesystems)) for fs := range remoteFilesystems { var err error var localFs *zfs.DatasetPath - localFs, err = pull.Mapping.Map(remoteFilesystems[fs]) + localFs, err = p.Mapping.Map(remoteFilesystems[fs]) if err != nil { err := fmt.Errorf("error mapping %s: %s", remoteFilesystems[fs], err) - log.WithError(err).WithField(logMapFromField, remoteFilesystems[fs]).Error("cannot map") - return err + p.task.Log().WithError(err).WithField(logMapFromField, remoteFilesystems[fs]).Error("cannot map") + return nil, false } if localFs == nil { continue } - log.WithField(logMapFromField, remoteFilesystems[fs].ToString()). + p.task.Log().WithField(logMapFromField, remoteFilesystems[fs].ToString()). WithField(logMapToField, localFs.ToString()).Debug("mapping") - m := RemoteLocalMapping{remoteFilesystems[fs], localFs} + m := remoteLocalMapping{remoteFilesystems[fs], localFs} replMapping[m.Local.ToString()] = m + } + return replMapping, true +} + +// returns true if the receiving filesystem (local side) exists and can have child filesystems +func (p *Puller) replFilesystem(m remoteLocalMapping, localFilesystemState map[string]zfs.FilesystemState) (localExists bool) { + + p.task.Enter("repl_fs") + defer p.task.Finish() + var err error + remote := p.Remote + + log := p.task.Log(). + WithField(logMapToField, m.Remote.ToString()). + WithField(logMapFromField, m.Local.ToString()) + + log.Debug("examining local filesystem state") + localState, localExists := localFilesystemState[m.Local.ToString()] + var versions []zfs.FilesystemVersion + switch { + case !localExists: + log.Info("local filesystem does not exist") + case localState.Placeholder: + log.Info("local filesystem is marked as placeholder") + default: + log.Debug("local filesystem exists") + log.Debug("requesting local filesystem versions") + if versions, err = zfs.ZFSListFilesystemVersions(m.Local, nil); err != nil { + log.WithError(err).Error("cannot get local filesystem versions") + return false + } + } + + log.Info("requesting remote filesystem versions") + r := FilesystemVersionsRequest{ + Filesystem: m.Remote, + } + var theirVersions []zfs.FilesystemVersion + if err = remote.Call("FilesystemVersionsRequest", &r, &theirVersions); err != nil { + log.WithError(err).Error("cannot get remote filesystem versions") + log.Warn("stopping replication for all filesystems mapped as children of receiving filesystem") + return false + } + + log.Debug("computing diff between remote and local filesystem versions") + diff := zfs.MakeFilesystemDiff(versions, theirVersions) + log.WithField("diff", diff).Debug("diff between local and remote filesystem") + + if localState.Placeholder && diff.Conflict != zfs.ConflictAllRight { + panic("internal inconsistency: local placeholder implies ConflictAllRight") + } + + switch diff.Conflict { + case zfs.ConflictAllRight: + + log.WithField("replication_policy", p.InitialReplPolicy).Info("performing initial sync, following policy") + + if p.InitialReplPolicy != InitialReplPolicyMostRecent { + panic(fmt.Sprintf("policy '%s' not implemented", p.InitialReplPolicy)) + } + + snapsOnly := make([]zfs.FilesystemVersion, 0, len(diff.MRCAPathRight)) + for s := range diff.MRCAPathRight { + if diff.MRCAPathRight[s].Type == zfs.Snapshot { + snapsOnly = append(snapsOnly, diff.MRCAPathRight[s]) + } + } + + if len(snapsOnly) < 1 { + log.Warn("cannot perform initial sync: no remote snapshots") + return false + } + + r := InitialTransferRequest{ + Filesystem: m.Remote, + FilesystemVersion: snapsOnly[len(snapsOnly)-1], + } + + log.WithField("version", r.FilesystemVersion).Debug("requesting snapshot stream") + + var stream io.Reader + + if err = remote.Call("InitialTransferRequest", &r, &stream); err != nil { + log.WithError(err).Error("cannot request initial transfer") + return false + } + log.Debug("received initial transfer request response") + + log.Debug("invoke zfs receive") + recvArgs := []string{"-u"} + if localState.Placeholder { + log.Info("receive with forced rollback to replace placeholder filesystem") + recvArgs = append(recvArgs, "-F") + } + progressStream := p.task.ProgressUpdater(stream) + if err = zfs.ZFSRecv(m.Local, progressStream, recvArgs...); err != nil { + log.WithError(err).Error("cannot receive stream") + return false + } + log.Info("finished receiving stream") // TODO rx delta + + // TODO unify with recv path of ConflictIncremental + log.Debug("configuring properties of received filesystem") + if err = zfs.ZFSSet(m.Local, "readonly", "on"); err != nil { + log.WithError(err).Error("cannot set readonly property") + } + + log.Info("finished initial transfer") + return true + + case zfs.ConflictIncremental: + + if len(diff.IncrementalPath) < 2 { + log.Info("remote and local are in sync") + return true + } + + log.Info("following incremental path from diff") + for i := 0; i < len(diff.IncrementalPath)-1; i++ { + + from, to := diff.IncrementalPath[i], diff.IncrementalPath[i+1] + + log, _ := log.WithField(logIncFromField, from.Name).WithField(logIncToField, to.Name), 0 + + log.Debug("requesting incremental snapshot stream") + r := IncrementalTransferRequest{ + Filesystem: m.Remote, + From: from, + To: to, + } + var stream io.Reader + if err = remote.Call("IncrementalTransferRequest", &r, &stream); err != nil { + log.WithError(err).Error("cannot request incremental snapshot stream") + return false + } + + log.Debug("invoking zfs receive") + progressStream := p.task.ProgressUpdater(stream) + // TODO protect against malicious incremental stream + if err = zfs.ZFSRecv(m.Local, progressStream); err != nil { + log.WithError(err).Error("cannot receive stream") + return false + } + log.Info("finished incremental transfer") // TODO increment rx + + } + log.Info("finished following incremental path") // TODO path rx + return true + + case zfs.ConflictNoCommonAncestor: + fallthrough + case zfs.ConflictDiverged: + + var jsonDiff bytes.Buffer + if err := json.NewEncoder(&jsonDiff).Encode(diff); err != nil { + log.WithError(err).Error("cannot JSON-encode diff") + return false + } + + var problem, resolution string + + switch diff.Conflict { + case zfs.ConflictNoCommonAncestor: + problem = "remote and local filesystem have snapshots, but no common one" + resolution = "perform manual establish a common snapshot history" + case zfs.ConflictDiverged: + problem = "remote and local filesystem share a history but have diverged" + resolution = "perform manual replication or delete snapshots on the receiving" + + "side to establish an incremental replication parse" + } + + log.WithField("diff", jsonDiff.String()). + WithField("problem", problem). + WithField("resolution", resolution). + Error("manual conflict resolution required") + + return false + + } + + panic("should not be reached") +} + +func (p *Puller) Pull() { + p.task.Enter("run") + defer p.task.Finish() + + p.task.Log().Info("request remote filesystem list") + remoteFilesystems, ok := p.getRemoteFilesystems() + if !ok { + return + } + + p.task.Log().Debug("map remote filesystems to local paths and determine order for per-filesystem sync") + replMapping, ok := p.buildReplMapping(remoteFilesystems) + if !ok { + + } + + p.task.Log().Debug("build cache for already present local filesystem state") + p.task.Enter("cache_local_fs_state") + localFilesystemState, err := zfs.ZFSListFilesystemState() + p.task.Finish() + if err != nil { + p.task.Log().WithError(err).Error("cannot request local filesystem state") + return + } + + localTraversal := zfs.NewDatasetPathForest() + for _, m := range replMapping { localTraversal.Add(m.Local) } - log.Debug("build cache for already present local filesystem state") - localFilesystemState, err := zfs.ZFSListFilesystemState() - if err != nil { - log.WithError(err).Error("cannot request local filesystem state") - return err - } - - log.Info("start per-filesystem sync") + p.task.Log().Info("start per-filesystem sync") localTraversal.WalkTopDown(func(v zfs.DatasetPathVisit) bool { - log := log.WithField(logFSField, v.Path.ToString()) + p.task.Enter("tree_walk") + defer p.task.Finish() + + log := p.task.Log().WithField(logFSField, v.Path.ToString()) if v.FilledIn { if _, exists := localFilesystemState[v.Path.ToString()]; exists { @@ -117,7 +322,9 @@ func (pull *Puller) Pull() (err error) { return true } log.Debug("create placeholder filesystem") + p.task.Enter("create_placeholder") err = zfs.ZFSCreatePlaceholderFilesystem(v.Path) + p.task.Finish() if err != nil { log.Error("cannot create placeholder filesystem") return false @@ -130,189 +337,7 @@ func (pull *Puller) Pull() (err error) { panic("internal inconsistency: replMapping should contain mapping for any path that was not filled in by WalkTopDown()") } - log = log.WithField(logMapToField, m.Remote.ToString()). - WithField(logMapFromField, m.Local.ToString()) - - log.Debug("examing local filesystem state") - localState, localExists := localFilesystemState[m.Local.ToString()] - var versions []zfs.FilesystemVersion - switch { - case !localExists: - log.Info("local filesystem does not exist") - case localState.Placeholder: - log.Info("local filesystem is marked as placeholder") - default: - log.Debug("local filesystem exists") - log.Debug("requesting local filesystem versions") - if versions, err = zfs.ZFSListFilesystemVersions(m.Local, nil); err != nil { - log.WithError(err).Error("cannot get local filesystem versions") - return false - } - } - - log.Info("requesting remote filesystem versions") - r := FilesystemVersionsRequest{ - Filesystem: m.Remote, - } - var theirVersions []zfs.FilesystemVersion - if err = remote.Call("FilesystemVersionsRequest", &r, &theirVersions); err != nil { - log.WithError(err).Error("cannot get remote filesystem versions") - log.Warn("stopping replication for all filesystems mapped as children of receiving filesystem") - return false - } - - log.Debug("computing diff between remote and local filesystem versions") - diff := zfs.MakeFilesystemDiff(versions, theirVersions) - log.WithField("diff", diff).Debug("diff between local and remote filesystem") - - if localState.Placeholder && diff.Conflict != zfs.ConflictAllRight { - panic("internal inconsistency: local placeholder implies ConflictAllRight") - } - - switch diff.Conflict { - case zfs.ConflictAllRight: - - log.WithField("replication_policy", pull.InitialReplPolicy).Info("performing initial sync, following policy") - - if pull.InitialReplPolicy != InitialReplPolicyMostRecent { - panic(fmt.Sprintf("policy '%s' not implemented", pull.InitialReplPolicy)) - } - - snapsOnly := make([]zfs.FilesystemVersion, 0, len(diff.MRCAPathRight)) - for s := range diff.MRCAPathRight { - if diff.MRCAPathRight[s].Type == zfs.Snapshot { - snapsOnly = append(snapsOnly, diff.MRCAPathRight[s]) - } - } - - if len(snapsOnly) < 1 { - log.Warn("cannot perform initial sync: no remote snapshots") - return false - } - - r := InitialTransferRequest{ - Filesystem: m.Remote, - FilesystemVersion: snapsOnly[len(snapsOnly)-1], - } - - log.WithField("version", r.FilesystemVersion).Debug("requesting snapshot stream") - - var stream io.Reader - - if err = remote.Call("InitialTransferRequest", &r, &stream); err != nil { - log.WithError(err).Error("cannot request initial transfer") - return false - } - log.Debug("received initial transfer request response") - - log.Debug("invoke zfs receive") - watcher := util.IOProgressWatcher{Reader: stream} - watcher.KickOff(1*time.Second, func(p util.IOProgress) { - log.WithField("total_rx", p.TotalRX).Info("progress on receive operation") - }) - - recvArgs := []string{"-u"} - if localState.Placeholder { - log.Info("receive with forced rollback to replace placeholder filesystem") - recvArgs = append(recvArgs, "-F") - } - - if err = zfs.ZFSRecv(m.Local, &watcher, recvArgs...); err != nil { - log.WithError(err).Error("canot receive stream") - return false - } - log.WithField("total_rx", watcher.Progress().TotalRX). - Info("finished receiving stream") - - log.Debug("configuring properties of received filesystem") - if err = zfs.ZFSSet(m.Local, "readonly", "on"); err != nil { - log.WithError(err).Error("cannot set readonly property") - } - - log.Info("finished initial transfer") - return true - - case zfs.ConflictIncremental: - - if len(diff.IncrementalPath) < 2 { - log.Info("remote and local are in sync") - return true - } - - log.Info("following incremental path from diff") - var pathRx uint64 - - for i := 0; i < len(diff.IncrementalPath)-1; i++ { - - from, to := diff.IncrementalPath[i], diff.IncrementalPath[i+1] - - log, _ := log.WithField(logIncFromField, from.Name).WithField(logIncToField, to.Name), 0 - - log.Debug("requesting incremental snapshot stream") - r := IncrementalTransferRequest{ - Filesystem: m.Remote, - From: from, - To: to, - } - var stream io.Reader - if err = remote.Call("IncrementalTransferRequest", &r, &stream); err != nil { - log.WithError(err).Error("cannot request incremental snapshot stream") - return false - } - - log.Debug("invoking zfs receive") - watcher := util.IOProgressWatcher{Reader: stream} - watcher.KickOff(1*time.Second, func(p util.IOProgress) { - log.WithField("total_rx", p.TotalRX).Info("progress on receive operation") - }) - - if err = zfs.ZFSRecv(m.Local, &watcher); err != nil { - log.WithError(err).Error("cannot receive stream") - return false - } - - totalRx := watcher.Progress().TotalRX - pathRx += totalRx - log.WithField("total_rx", totalRx).Info("finished incremental transfer") - - } - - log.WithField("total_rx", pathRx).Info("finished following incremental path") - return true - - case zfs.ConflictNoCommonAncestor: - fallthrough - case zfs.ConflictDiverged: - - var jsonDiff bytes.Buffer - if err := json.NewEncoder(&jsonDiff).Encode(diff); err != nil { - log.WithError(err).Error("cannot JSON-encode diff") - return false - } - - var problem, resolution string - - switch diff.Conflict { - case zfs.ConflictNoCommonAncestor: - problem = "remote and local filesystem have snapshots, but no common one" - resolution = "perform manual establish a common snapshot history" - case zfs.ConflictDiverged: - problem = "remote and local filesystem share a history but have diverged" - resolution = "perform manual replication or delete snapshots on the receiving" + - "side to establish an incremental replication parse" - } - - log.WithField("diff", jsonDiff.String()). - WithField("problem", problem). - WithField("resolution", resolution). - Error("manual conflict resolution required") - - return false - - } - - panic("should not be reached") - + return p.replFilesystem(m, localFilesystemState) }) return diff --git a/util/io.go b/util/io.go index 446b323..68ae286 100644 --- a/util/io.go +++ b/util/io.go @@ -3,7 +3,6 @@ package util import ( "io" "os" - "time" ) type ReadWriteCloserLogger struct { @@ -97,52 +96,3 @@ func (c *ChainedReader) Read(buf []byte) (n int, err error) { return } - -type IOProgress struct { - TotalRX uint64 -} - -type IOProgressCallback func(progress IOProgress) - -type IOProgressWatcher struct { - Reader io.Reader - callback IOProgressCallback - callbackTicker *time.Ticker - progress IOProgress - updateChannel chan int -} - -func (w *IOProgressWatcher) KickOff(callbackInterval time.Duration, callback IOProgressCallback) { - w.callback = callback - w.callbackTicker = time.NewTicker(callbackInterval) - w.updateChannel = make(chan int) - go func() { - outer: - for { - select { - case newBytes, more := <-w.updateChannel: - w.progress.TotalRX += uint64(newBytes) - if !more { - w.callbackTicker.Stop() - break outer - } - case <-w.callbackTicker.C: - w.callback(w.progress) - } - } - w.callback(w.progress) - }() -} - -func (w *IOProgressWatcher) Progress() IOProgress { - return w.progress -} - -func (w *IOProgressWatcher) Read(p []byte) (n int, err error) { - n, err = w.Reader.Read(p) - w.updateChannel <- n - if err != nil { - close(w.updateChannel) - } - return -}