diff --git a/cmd/replication.go b/cmd/replication.go index 41efb98..c48681f 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -75,6 +75,7 @@ func cmdPull(cmd *cobra.Command, args []string) { log.Printf("could not find pull job %s", args[0]) os.Exit(1) } + if err := jobPull(job, log); err != nil { log.Printf("error doing pull: %s", err) os.Exit(1) @@ -249,51 +250,46 @@ func doPull(pull PullContext) (err error) { remote := pull.Remote log := pull.Log + log.Printf("requesting remote filesystem list") fsr := rpc.FilesystemRequest{} var remoteFilesystems []*zfs.DatasetPath if remoteFilesystems, err = remote.FilesystemRequest(fsr); err != nil { return } - // build mapping (local->RemoteLocalMapping) + traversal datastructure + log.Printf("map remote filesystems to local paths and determine order for per-filesystem sync") type RemoteLocalMapping struct { Remote *zfs.DatasetPath Local *zfs.DatasetPath } replMapping := make(map[string]RemoteLocalMapping, len(remoteFilesystems)) localTraversal := zfs.NewDatasetPathForest() - { - - log.Printf("mapping using %#v\n", pull.Mapping) - for fs := range remoteFilesystems { - var err error - var localFs *zfs.DatasetPath - localFs, err = pull.Mapping.Map(remoteFilesystems[fs]) - if err != nil { - if err != NoMatchError { - log.Printf("error mapping %s: %#v\n", remoteFilesystems[fs], err) - return err - } - continue + for fs := range remoteFilesystems { + var err error + var localFs *zfs.DatasetPath + localFs, err = pull.Mapping.Map(remoteFilesystems[fs]) + if err != nil { + if err != NoMatchError { + err := fmt.Errorf("error mapping %s: %s", remoteFilesystems[fs], err) + log.Printf("%s", err) + return err } - m := RemoteLocalMapping{remoteFilesystems[fs], localFs} - replMapping[m.Local.ToString()] = m - localTraversal.Add(m.Local) + continue } - + log.Printf("%s => %s", remoteFilesystems[fs].ToString(), localFs.ToString()) + m := RemoteLocalMapping{remoteFilesystems[fs], localFs} + replMapping[m.Local.ToString()] = m + localTraversal.Add(m.Local) } - // get info about local filesystems + log.Printf("build cache for already present local filesystem state") localFilesystemState, err := zfs.ZFSListFilesystemState() if err != nil { - log.Printf("cannot get local filesystems map: %s", err) + log.Printf("error requesting local filesystem state: %s", err) return err } - log.Printf("remoteFilesystems: %#v\nreplMapping: %#v\n", remoteFilesystems, replMapping) - - // per fs sync, assume sorted in top-down order TODO - + log.Printf("start per-filesystem sync") localTraversal.WalkTopDown(func(v zfs.DatasetPathVisit) bool { if v.FilledIn { @@ -302,7 +298,7 @@ func doPull(pull PullContext) (err error) { // to know we can add child filesystems to it return true } - log.Printf("creating placeholder filesystem %s", v.Path) + log.Printf("creating placeholder filesystem %s", v.Path.ToString()) err = zfs.ZFSCreatePlaceholderFilesystem(v.Path) if err != nil { err = fmt.Errorf("aborting, cannot create placeholder filesystem %s: %s", v.Path, err) @@ -320,10 +316,8 @@ func doPull(pull PullContext) (err error) { log.Printf("[%s => %s]: %s", m.Remote.ToString(), m.Local.ToString(), fmt.Sprintf(format, args...)) } - log("mapping: %#v\n", m) - + log("examing local filesystem state") localState, localExists := localFilesystemState[m.Local.ToString()] - var versions []zfs.FilesystemVersion switch { case !localExists: @@ -331,24 +325,28 @@ func doPull(pull PullContext) (err error) { case localState.Placeholder: log("local filesystem is marked as placeholder") default: - log("local filesystem exists, retrieving versions for diff") + log("local filesystem exists") + log("requesting local filesystem versions") if versions, err = zfs.ZFSListFilesystemVersions(m.Local, nil); err != nil { - log("cannot get filesystem versions, stopping...: %v\n", m.Local.ToString(), m, err) + log("cannot get local filesystem versions: %s", err) return false } } + log("requesting remote filesystem versions") var theirVersions []zfs.FilesystemVersion theirVersions, err = remote.FilesystemVersionsRequest(rpc.FilesystemVersionsRequest{ Filesystem: m.Remote, }) if err != nil { - log("cannot fetch remote filesystem versions, stopping: %s", err) + log("error requesting remote filesystem versions: %s", err) + log("stopping replication for all filesystems mapped as children of %s", m.Local.ToString()) return false } + log("computing diff between remote and local filesystem versions") diff := zfs.MakeFilesystemDiff(versions, theirVersions) - log("diff: %#v\n", diff) + log("%s", diff) if localState.Placeholder && diff.Conflict != zfs.ConflictAllRight { panic("internal inconsistency: local placeholder implies ConflictAllRight") @@ -357,10 +355,10 @@ func doPull(pull PullContext) (err error) { switch diff.Conflict { case zfs.ConflictAllRight: - log("performing initial sync, following policy: %#v", pull.InitialReplPolicy) + log("performing initial sync, following policy: '%s'", pull.InitialReplPolicy) if pull.InitialReplPolicy != rpc.InitialReplPolicyMostRecent { - panic(fmt.Sprintf("policy %#v not implemented", pull.InitialReplPolicy)) + panic(fmt.Sprintf("policy '%s' not implemented", pull.InitialReplPolicy)) } snapsOnly := make([]zfs.FilesystemVersion, 0, len(diff.MRCAPathRight)) @@ -380,16 +378,16 @@ func doPull(pull PullContext) (err error) { FilesystemVersion: snapsOnly[len(snapsOnly)-1], } - log("requesting initial transfer") + log("requesting snapshot stream for %s", r.FilesystemVersion) var stream io.Reader if stream, err = remote.InitialTransferRequest(r); err != nil { - log("error initial transfer request, stopping...: %s", err) + log("error requesting initial transfer: %s", err) return false } + log("received initial transfer request response") - log("received initial transfer request response. zfs recv...") - + log("invoking zfs receive") watcher := util.IOProgressWatcher{Reader: stream} watcher.KickOff(1*time.Second, func(p util.IOProgress) { log("progress on receive operation: %v bytes received", p.TotalRX) @@ -402,13 +400,12 @@ func doPull(pull PullContext) (err error) { } if err = zfs.ZFSRecv(m.Local, &watcher, recvArgs...); err != nil { - log("error receiving stream, stopping...: %s", err) + log("error receiving stream: %s", err) return false } - log("received stream, %v bytes total", watcher.Progress().TotalRX) + log("finished receiving stream, %v bytes total", watcher.Progress().TotalRX) log("configuring properties of received filesystem") - if err = zfs.ZFSSet(m.Local, "readonly", "on"); err != nil { } @@ -423,7 +420,7 @@ func doPull(pull PullContext) (err error) { return true } - log("incremental transfers using path: %#v", diff.IncrementalPath) + log("following incremental path from diff") var pathRx uint64 for i := 0; i < len(diff.IncrementalPath)-1; i++ { @@ -431,31 +428,30 @@ func doPull(pull PullContext) (err error) { from, to := diff.IncrementalPath[i], diff.IncrementalPath[i+1] log := func(format string, args ...interface{}) { - log("[%s => %s]: %s", from.Name, to.Name, fmt.Sprintf(format, args...)) + log("[%v/%v][%s => %s]: %s", i+1, len(diff.IncrementalPath)-1, + from.Name, to.Name, fmt.Sprintf(format, args...)) } + log("requesting incremental snapshot stream") r := rpc.IncrementalTransferRequest{ Filesystem: m.Remote, From: from, To: to, } - log("requesting incremental transfer: %#v", r) - var stream io.Reader if stream, err = remote.IncrementalTransferRequest(r); err != nil { - log("error requesting incremental transfer, stopping...: %s", err.Error()) + log("error requesting incremental snapshot stream: %s", err) return false } - log("receving incremental transfer") - + log("invoking zfs receive") watcher := util.IOProgressWatcher{Reader: stream} watcher.KickOff(1*time.Second, func(p util.IOProgress) { log("progress on receive operation: %v bytes received", p.TotalRX) }) if err = zfs.ZFSRecv(m.Local, &watcher); err != nil { - log("error receiving stream, stopping...: %s", err) + log("error receiving stream: %s", err) return false } @@ -465,24 +461,36 @@ func doPull(pull PullContext) (err error) { } - log("finished incremental transfer path, %v bytes total", pathRx) + log("finished following incremental path, %v bytes total", pathRx) return true case zfs.ConflictNoCommonAncestor: - log("sender and receiver filesystem have snapshots, but no common one") + log("remote and local filesystem have snapshots, but no common one") log("perform manual replication to establish a common snapshot history") - log("sender snapshot list: %#v", diff.MRCAPathRight) - log("receiver snapshot list: %#v", diff.MRCAPathLeft) + log("remote versions:") + for _, v := range diff.MRCAPathRight { + log(" %s (GUID %v)", v, v.Guid) + } + log("local versions:") + for _, v := range diff.MRCAPathLeft { + log(" %s (GUID %v)", v, v.Guid) + } return false case zfs.ConflictDiverged: - log("sender and receiver filesystem share a history but have diverged") + log("remote and local filesystem share a history but have diverged") log("perform manual replication or delete snapshots on the receiving" + "side to establish an incremental replication parse") - log("sender-only snapshots: %#v", diff.MRCAPathRight) - log("receiver-only snapshots: %#v", diff.MRCAPathLeft) + log("remote-only versions:") + for _, v := range diff.MRCAPathRight { + log(" %s (GUID %v)", v, v.Guid) + } + log("local-only versions:") + for _, v := range diff.MRCAPathLeft { + log(" %s (GUID %v)", v, v.Guid) + } return false } diff --git a/rpc/rpc.go b/rpc/rpc.go index df7ba9b..9852545 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -8,7 +8,6 @@ import ( . "github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/zfs" "io" - "os" "reflect" "time" ) @@ -503,7 +502,6 @@ func (c ByteStreamRPC) CloseRequest(r CloseRequest) (err error) { if err = c.sendRequestReceiveHeader(r, ROK); err != nil { return } - os.Stderr.WriteString("close request conn.Close()") err = c.conn.Close() return } diff --git a/zfs/conflict_string.go b/zfs/conflict_string.go new file mode 100644 index 0000000..cf25248 --- /dev/null +++ b/zfs/conflict_string.go @@ -0,0 +1,16 @@ +// Code generated by "stringer -type=Conflict"; DO NOT EDIT. + +package zfs + +import "fmt" + +const _Conflict_name = "ConflictIncrementalConflictAllRightConflictNoCommonAncestorConflictDiverged" + +var _Conflict_index = [...]uint8{0, 19, 35, 59, 75} + +func (i Conflict) String() string { + if i < 0 || i >= Conflict(len(_Conflict_index)-1) { + return fmt.Sprintf("Conflict(%d)", i) + } + return _Conflict_name[_Conflict_index[i]:_Conflict_index[i+1]] +} diff --git a/zfs/diff.go b/zfs/diff.go index 07786c1..2421f64 100644 --- a/zfs/diff.go +++ b/zfs/diff.go @@ -17,13 +17,14 @@ func (l fsbyCreateTXG) Less(i, j int) bool { return l[i].CreateTXG < l[j].CreateTXG } +//go:generate stringer -type=Conflict type Conflict int const ( - ConflictIncremental = 0 // no conflict, incremental repl possible - ConflictAllRight = 1 // no conflict, initial repl possible - ConflictNoCommonAncestor = 2 - ConflictDiverged = 3 + ConflictIncremental Conflict = iota // no conflict, incremental repl possible + ConflictAllRight // no conflict, initial repl possible + ConflictNoCommonAncestor + ConflictDiverged ) /* The receiver (left) wants to know if the sender (right) has more recent versions @@ -73,6 +74,27 @@ type FilesystemDiff struct { MRCAPathRight []FilesystemVersion } +func (f FilesystemDiff) String() (str string) { + var b bytes.Buffer + + fmt.Fprintf(&b, "%s, ", f.Conflict) + + switch f.Conflict { + case ConflictIncremental: + fmt.Fprintf(&b, "incremental path length %v, common ancestor at %s", len(f.IncrementalPath)-1, f.IncrementalPath[0]) + case ConflictAllRight: + fmt.Fprintf(&b, "%v versions, most recent is %s", len(f.MRCAPathRight)-1, f.MRCAPathRight[len(f.MRCAPathRight)-1]) + case ConflictDiverged: + fmt.Fprintf(&b, "diverged at %s", f.MRCAPathRight[0]) // right always has at least one snap...? + case ConflictNoCommonAncestor: + fmt.Fprintf(&b, "no diff to show") + default: + fmt.Fprintf(&b, "unknown conflict type, likely a bug") + } + + return b.String() +} + // we must assume left and right are ordered ascendingly by ZFS_PROP_CREATETXG and that // names are unique (bas ZFS_PROP_GUID replacement) func MakeFilesystemDiff(left, right []FilesystemVersion) (diff FilesystemDiff) { diff --git a/zfs/versions.go b/zfs/versions.go index ec5593d..292d4ee 100644 --- a/zfs/versions.go +++ b/zfs/versions.go @@ -45,6 +45,10 @@ type FilesystemVersion struct { Creation time.Time } +func (v FilesystemVersion) String() string { + return fmt.Sprintf("%s%s", v.Type.DelimiterChar(), v.Name) +} + func (v FilesystemVersion) ToAbsPath(p *DatasetPath) string { var b bytes.Buffer b.WriteString(p.ToString())