From 0918ef681540d7aec8d141f79dee1e9ac06af752 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 2 May 2018 21:26:11 +0200 Subject: [PATCH] WIP: diffing and replication algorithm --- cmd/replication/diff.go | 116 ++++++++++++ cmd/replication/diff_test.go | 269 ++++++++++++++++++++++++++++ cmd/replication/replication.go | 209 +++++++++++++++++++++ cmd/replication/replication_test.go | 152 ++++++++++++++++ 4 files changed, 746 insertions(+) create mode 100644 cmd/replication/diff.go create mode 100644 cmd/replication/diff_test.go create mode 100644 cmd/replication/replication.go create mode 100644 cmd/replication/replication_test.go diff --git a/cmd/replication/diff.go b/cmd/replication/diff.go new file mode 100644 index 0000000..3494ba6 --- /dev/null +++ b/cmd/replication/diff.go @@ -0,0 +1,116 @@ +package replication + +import ( + "github.com/zrepl/zrepl/zfs" + "sort" +) + +type ConflictNoCommonAncestor struct { + SortedSenderVersions, SortedReceiverVersions []zfs.FilesystemVersion +} + +func (c *ConflictNoCommonAncestor) Error() string { + return "no common snapshot or suitable bookmark between sender and receiver" +} + +type ConflictDiverged struct { + SortedSenderVersions, SortedReceiverVersions []zfs.FilesystemVersion + CommonAncestor zfs.FilesystemVersion + SenderOnly, ReceiverOnly []zfs.FilesystemVersion +} + +func (c *ConflictDiverged) Error() string { + return "the receiver's latest snapshot is not present on sender" +} + +func SortVersionListByCreateTXGThenBookmarkLTSnapshot(fsvslice []zfs.FilesystemVersion) []zfs.FilesystemVersion { + lesser := func(s []zfs.FilesystemVersion) func(i, j int) bool { + return func(i, j int) bool { + if s[i].CreateTXG < s[j].CreateTXG { + return true + } + if s[i].CreateTXG == s[j].CreateTXG { + // Bookmark < Snapshot + return s[i].Type == zfs.Bookmark && s[j].Type == zfs.Snapshot + } + return false + } + } + if sort.SliceIsSorted(fsvslice, lesser(fsvslice)) { + return fsvslice + } + sorted := make([]zfs.FilesystemVersion, len(fsvslice)) + copy(sorted, fsvslice) + sort.Slice(sorted, lesser(sorted)) + return sorted +} + +// conflict may be a *ConflictDiverged or a *ConflictNoCommonAncestor +func IncrementalPath(receiver, sender []zfs.FilesystemVersion) (incPath []zfs.FilesystemVersion, conflict error) { + + if receiver == nil { + panic("receiver must not be nil") + } + if sender == nil { + panic("sender must not be nil") + } + + receiver = SortVersionListByCreateTXGThenBookmarkLTSnapshot(receiver) + sender = SortVersionListByCreateTXGThenBookmarkLTSnapshot(sender) + + if len(sender) == 0 { + return []zfs.FilesystemVersion{}, nil + } + + // Find most recent common ancestor by name, preferring snapshots over bookmarks + + mrcaRcv := len(receiver) - 1 + mrcaSnd := len(sender) - 1 + + for mrcaRcv >= 0 && mrcaSnd >= 0 { + if receiver[mrcaRcv].Guid == sender[mrcaSnd].Guid { + if mrcaSnd-1 >= 0 && sender[mrcaSnd-1].Guid == sender[mrcaSnd].Guid && sender[mrcaSnd-1].Type == zfs.Bookmark { + // prefer bookmarks over snapshots as the snapshot might go away sooner + mrcaSnd -= 1 + } + break + } + if receiver[mrcaRcv].CreateTXG < sender[mrcaSnd].CreateTXG { + mrcaSnd-- + } else { + mrcaRcv-- + } + } + + if mrcaRcv == -1 || mrcaSnd == -1 { + return nil, &ConflictNoCommonAncestor{ + SortedSenderVersions: sender, + SortedReceiverVersions: receiver, + } + } + + if mrcaRcv != len(receiver)-1 { + return nil, &ConflictDiverged{ + SortedSenderVersions: sender, + SortedReceiverVersions: receiver, + CommonAncestor: sender[mrcaSnd], + SenderOnly: sender[mrcaSnd+1:], + ReceiverOnly: receiver[mrcaRcv+1:], + } + } + + // incPath must not contain bookmarks except initial one, + incPath = make([]zfs.FilesystemVersion, 0, len(sender)) + incPath = append(incPath, sender[mrcaSnd]) + // it's ok if incPath[0] is a bookmark, but not the subsequent ones in the incPath + for i := mrcaSnd + 1; i < len(sender); i++ { + if sender[i].Type == zfs.Snapshot && incPath[len(incPath)-1].Guid != sender[i].Guid { + incPath = append(incPath, sender[i]) + } + } + if len(incPath) == 1 { + // nothing to do + incPath = incPath[1:] + } + return incPath, nil +} diff --git a/cmd/replication/diff_test.go b/cmd/replication/diff_test.go new file mode 100644 index 0000000..916b8b0 --- /dev/null +++ b/cmd/replication/diff_test.go @@ -0,0 +1,269 @@ +package replication_test + +import ( + "github.com/stretchr/testify/assert" + "github.com/zrepl/zrepl/cmd/replication" + "github.com/zrepl/zrepl/zfs" + "strconv" + "strings" + "testing" + "time" +) + +func fsvlist(fsv ...string) (r []zfs.FilesystemVersion) { + + r = make([]zfs.FilesystemVersion, len(fsv)) + for i, f := range fsv { + + // parse the id from fsvlist. it is used to derivce Guid,CreateTXG and Creation attrs + split := strings.Split(f, ",") + if len(split) != 2 { + panic("invalid fsv spec") + } + id, err := strconv.Atoi(split[1]) + if err != nil { + panic(err) + } + + if strings.HasPrefix(f, "#") { + r[i] = zfs.FilesystemVersion{ + Name: strings.TrimPrefix(f, "#"), + Type: zfs.Bookmark, + Guid: uint64(id), + CreateTXG: uint64(id), + Creation: time.Unix(0, 0).Add(time.Duration(id) * time.Second), + } + } else if strings.HasPrefix(f, "@") { + r[i] = zfs.FilesystemVersion{ + Name: strings.TrimPrefix(f, "@"), + Type: zfs.Snapshot, + Guid: uint64(id), + CreateTXG: uint64(id), + Creation: time.Unix(0, 0).Add(time.Duration(id) * time.Second), + } + } else { + panic("invalid character") + } + } + return +} + +type incPathResult struct { + incPath []zfs.FilesystemVersion + conflict error +} + +type IncrementalPathTest struct { + Msg string + Receiver, Sender []zfs.FilesystemVersion + ExpectIncPath []zfs.FilesystemVersion + ExpectNoCommonAncestor bool + ExpectDiverged *replication.ConflictDiverged + ExpectPanic bool +} + +func (tt *IncrementalPathTest) Test(t *testing.T) { + + t.Logf("test: %s", tt.Msg) + + if tt.ExpectPanic { + assert.Panics(t, func() { + replication.IncrementalPath(tt.Receiver, tt.Sender) + }) + return + } + + incPath, conflict := replication.IncrementalPath(tt.Receiver, tt.Sender) + + if tt.ExpectIncPath != nil { + assert.Nil(t, conflict) + assert.True(t, len(incPath) == 0 || len(incPath) >= 2) + assert.Equal(t, tt.ExpectIncPath, incPath) + return + } + if conflict == nil { + t.Logf("conflict is (unexpectly) \nincPath: %#v", incPath) + } + if tt.ExpectNoCommonAncestor { + assert.IsType(t, &replication.ConflictNoCommonAncestor{}, conflict) + // TODO check sorting + return + } + if tt.ExpectDiverged != nil { + if !assert.IsType(t, &replication.ConflictDiverged{}, conflict) { + return + } + c := conflict.(*replication.ConflictDiverged) + // TODO check sorting + assert.NotZero(t, c.CommonAncestor) + assert.NotEmpty(t, c.ReceiverOnly) + assert.Equal(t, tt.ExpectDiverged.ReceiverOnly, c.ReceiverOnly) + assert.Equal(t, tt.ExpectDiverged.SenderOnly, c.SenderOnly) + return + } + +} + +func TestIncrementalPlan_IncrementalSnapshots(t *testing.T) { + l := fsvlist + + tbl := []IncrementalPathTest{ + { + Msg: "basic functionality", + Receiver: l("@a,1", "@b,2"), + Sender: l("@a,1", "@b,2", "@c,3", "@d,4"), + ExpectIncPath: l("@b,2", "@c,3", "@d,4"), + }, + { + Msg: "no snaps on receiver yields no common ancestor", + Receiver: l(), + Sender: l("@a,1"), + ExpectNoCommonAncestor: true, + }, + { + Msg: "no snapshots on sender yields empty incremental path", + Receiver: l(), + Sender: l(), + ExpectIncPath: l(), + }, + { + Msg: "nothing to do yields empty incremental path", + Receiver: l("@a,1"), + Sender: l("@a,1"), + ExpectIncPath: l(), + }, + { + Msg: "drifting apart", + Receiver: l("@a,1", "@b,2"), + Sender: l("@c,3", "@d,4"), + ExpectNoCommonAncestor: true, + }, + { + Msg: "different snapshots on sender and receiver", + Receiver: l("@a,1", "@c,2"), + Sender: l("@a,1", "@b,3"), + ExpectDiverged: &replication.ConflictDiverged{ + CommonAncestor: l("@a,1")[0], + SenderOnly: l("@b,3"), + ReceiverOnly: l("@c,2"), + }, + }, + { + Msg: "snapshot on receiver not present on sender", + Receiver: l("@a,1", "@b,2"), + Sender: l("@a,1"), + ExpectDiverged: &replication.ConflictDiverged{ + CommonAncestor: l("@a,1")[0], + SenderOnly: l(), + ReceiverOnly: l("@b,2"), + }, + }, + { + Msg: "gaps before most recent common ancestor do not matter", + Receiver: l("@a,1", "@b,2", "@c,3"), + Sender: l("@a,1", "@c,3", "@d,4"), + ExpectIncPath: l("@c,3", "@d,4"), + }, + } + + for _, test := range tbl { + test.Test(t) + } + +} + +func TestIncrementalPlan_BookmarksSupport(t *testing.T) { + l := fsvlist + + tbl := []IncrementalPathTest{ + { + Msg: "bookmarks are used", + Receiver: l("@a,1"), + Sender: l("#a,1", "@b,2"), + ExpectIncPath: l("#a,1", "@b,2"), + }, + { + Msg: "boomarks are stripped from incPath (cannot send incrementally)", + Receiver: l("@a,1"), + Sender: l("#a,1", "#b,2", "@c,3"), + ExpectIncPath: l("#a,1", "@c,3"), + }, + { + Msg: "bookmarks are preferred over snapshots for start of incPath", + Receiver: l("@a,1"), + Sender: l("#a,1", "@a,1", "@b,2"), + ExpectIncPath: l("#a,1", "@b,2"), + }, + { + Msg: "bookmarks are preferred over snapshots for start of incPath (regardless of order)", + Receiver: l("@a,1"), + Sender: l("@a,1", "#a,1", "@b,2"), + ExpectIncPath: l("#a,1", "@b,2"), + }, + } + + for _, test := range tbl { + test.Test(t) + } + +} + +func TestSortVersionListByCreateTXGThenBookmarkLTSnapshot(t *testing.T) { + + type Test struct { + Msg string + Input, Output []zfs.FilesystemVersion + } + + l := fsvlist + + tbl := []Test{ + { + "snapshot sorting already sorted", + l("@a,1", "@b,2"), + l("@a,1", "@b,2"), + }, + { + "bookmark sorting already sorted", + l("#a,1", "#b,2"), + l("#a,1", "#b,2"), + }, + { + "snapshot sorting", + l("@b,2", "@a,1"), + l("@a,1", "@b,2"), + }, + { + "bookmark sorting", + l("#b,2", "#a,1"), + l("#a,1", "#b,2"), + }, + } + + for _, test := range tbl { + t.Logf("test: %s", test.Msg) + inputlen := len(test.Input) + sorted := replication.SortVersionListByCreateTXGThenBookmarkLTSnapshot(test.Input) + if len(sorted) != inputlen { + t.Errorf("lenghts of input and output do not match: %d vs %d", inputlen, len(sorted)) + continue + } + if !assert.Equal(t, test.Output, sorted) { + continue + } + last := sorted[0] + for _, s := range sorted[1:] { + if s.CreateTXG < last.CreateTXG { + t.Errorf("must be sorted ascending, got:\n\t%#v", sorted) + break + } + if s.CreateTXG == last.CreateTXG { + if last.Type == zfs.Bookmark && s.Type != zfs.Snapshot { + t.Errorf("snapshots must come after bookmarks") + } + } + last = s + } + } + +} diff --git a/cmd/replication/replication.go b/cmd/replication/replication.go new file mode 100644 index 0000000..1759560 --- /dev/null +++ b/cmd/replication/replication.go @@ -0,0 +1,209 @@ +package replication + +import ( + "context" + "github.com/zrepl/zrepl/zfs" + "io" +) + +type ReplicationEndpoint interface { + // Does not include placeholder filesystems + ListFilesystems() ([]Filesystem, error) + ListFilesystemVersions(fs string) ([]zfs.FilesystemVersion, error) // fix depS + Sender + Receiver +} + +type Filesystem struct { + Path string + ResumeToken string +} + +type FilteredError struct{ fs string } + +func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs } + +type SendRequest struct { + Filesystem string + From, To string + // If ResumeToken is not empty, the resume token that CAN be tried for 'zfs send' by the sender + // If it does not work, the sender SHOULD clear the resume token on their side + // and use From and To instead + // If ResumeToken is not empty, the GUIDs of From and To + // MUST correspond to those encoded in the ResumeToken. + // Otherwise, the Sender MUST return an error. + ResumeToken string + Compress bool + Dedup bool +} + +type SendResponse struct { + Properties zfs.ZFSProperties // fix dep + Stream io.Reader +} + +type ReceiveRequest struct { + Filesystem string + // The resume token used by the sending side. + // The receiver MUST discard the saved state on their side if ResumeToken + // does not match the zfs property of Filesystem on their side. + ResumeToken string +} + +type ReplicationMode int + +const ( + ReplicationModePull ReplicationMode = iota + ReplicationModePush +) + +type EndpointPair struct { + a, b ReplicationEndpoint + m ReplicationMode +} + +func NewEndpointPairPull(sender, receiver ReplicationEndpoint) EndpointPair { + return EndpointPair{sender, receiver, ReplicationModePull} +} + +func NewEndpointPairPush(sender, receiver ReplicationEndpoint) EndpointPair { + return EndpointPair{receiver, sender, ReplicationModePush} +} + +func (p EndpointPair) Sender() ReplicationEndpoint { + switch p.m { + case ReplicationModePull: + return p.a + case ReplicationModePush: + return p.b + } + panic("should not be reached") + return nil +} + +func (p EndpointPair) Receiver() ReplicationEndpoint { + switch p.m { + case ReplicationModePull: + return p.b + case ReplicationModePush: + return p.a + } + panic("should not be reached") + return nil +} + +func (p EndpointPair) Mode() ReplicationMode { + return p.m +} + +func Replicate(ctx context.Context, ep EndpointPair, ipr IncrementalPathReplicator) { + + sfss, err := ep.Sender().ListFilesystems() + if err != nil { + // log error + return + } + + for _, fs := range sfss { + sfsvs, err := ep.Sender().ListFilesystemVersions(fs.Path) + rfsvs, err := ep.Receiver().ListFilesystemVersions(fs.Path) + if err != nil { + if _, ok := err.(FilteredError); ok { + // Remote does not map filesystem, don't try to tx it + continue + } + // log and ignore + continue + } + + path, conflict := IncrementalPath(rfsvs, sfsvs) + if conflict != nil { + // handle or ignore for now + continue + } + + ipr.Replicate(ctx, ep.Sender(), ep.Receiver(), NewCopier(), fs, path) + + } + +} + +type Sender interface { + Send(r SendRequest) (SendResponse, error) +} + +type Receiver interface { + Receive(r ReceiveRequest) (io.Writer, error) +} + +type Copier interface { + Copy(writer io.Writer, reader io.Reader) (int64, error) +} + +type copier struct{} + +func (copier) Copy(writer io.Writer, reader io.Reader) (int64, error) { + return io.Copy(writer, reader) +} + +func NewCopier() Copier { + return copier{} +} + +type IncrementalPathReplicator interface { + Replicate(ctx context.Context, sender Sender, receiver Receiver, copier Copier, fs Filesystem, path []zfs.FilesystemVersion) +} + +type incrementalPathReplicator struct{} + +func NewIncrementalPathReplicator() IncrementalPathReplicator { + return incrementalPathReplicator{} +} + +func (incrementalPathReplicator) Replicate(ctx context.Context, sender Sender, receiver Receiver, copier Copier, fs Filesystem, path []zfs.FilesystemVersion) { + + if len(path) == 0 { + // nothing to do + return + } + + usedResumeToken := false + +incrementalLoop: + for j := 0; j < len(path)-1; j++ { + rt := "" + if !usedResumeToken { + rt = fs.ResumeToken + usedResumeToken = true + } + sr := SendRequest{ + Filesystem: fs.Path, + From: path[j].String(), + To: path[j+1].String(), + ResumeToken: rt, + } + sres, err := sender.Send(sr) + if err != nil { + // handle and ignore + break incrementalLoop + } + // try to consume stream + + rr := ReceiveRequest{ + Filesystem: fs.Path, + ResumeToken: rt, + } + recvWriter, err := receiver.Receive(rr) + if err != nil { + // handle and ignore + break incrementalLoop + } + _, err = copier.Copy(recvWriter, sres.Stream) + if err != nil { + // handle and ignore + break incrementalLoop + } + + // handle properties from sres + } +} diff --git a/cmd/replication/replication_test.go b/cmd/replication/replication_test.go new file mode 100644 index 0000000..424c554 --- /dev/null +++ b/cmd/replication/replication_test.go @@ -0,0 +1,152 @@ +package replication_test + +import ( + "context" + "github.com/stretchr/testify/assert" + "github.com/zrepl/zrepl/cmd/replication" + "github.com/zrepl/zrepl/zfs" + "io" + "testing" +) + +type IncrementalPathSequenceStep struct { + SendRequest replication.SendRequest + SendResponse replication.SendResponse + SendError error + ReceiveRequest replication.ReceiveRequest + ReceiveWriter io.Writer + ReceiveError error +} + +type MockIncrementalPathRecorder struct { + T *testing.T + Sequence []IncrementalPathSequenceStep + Pos int +} + +func (m *MockIncrementalPathRecorder) Receive(r replication.ReceiveRequest) (io.Writer, error) { + if m.Pos >= len(m.Sequence) { + m.T.Fatal("unexpected Receive") + } + i := m.Sequence[m.Pos] + m.Pos++ + if !assert.Equal(m.T, i.ReceiveRequest, r) { + m.T.FailNow() + } + return i.ReceiveWriter, i.ReceiveError +} + +func (m *MockIncrementalPathRecorder) Send(r replication.SendRequest) (replication.SendResponse, error) { + if m.Pos >= len(m.Sequence) { + m.T.Fatal("unexpected Send") + } + i := m.Sequence[m.Pos] + m.Pos++ + if !assert.Equal(m.T, i.SendRequest, r) { + m.T.FailNow() + } + return i.SendResponse, i.SendError +} + +func (m *MockIncrementalPathRecorder) Finished() bool { + return m.Pos == len(m.Sequence) +} + +type DiscardCopier struct{} + +func (DiscardCopier) Copy(writer io.Writer, reader io.Reader) (int64, error) { + return 0, nil +} + +type IncrementalPathReplicatorTest struct { + Msg string + Filesystem replication.Filesystem + Path []zfs.FilesystemVersion + Steps []IncrementalPathSequenceStep +} + +func (test *IncrementalPathReplicatorTest) Test(t *testing.T) { + + t.Log(test.Msg) + + rec := &MockIncrementalPathRecorder{ + T: t, + Sequence: test.Steps, + } + + ipr := replication.NewIncrementalPathReplicator() + ipr.Replicate( + context.TODO(), + rec, + rec, + DiscardCopier{}, + test.Filesystem, + test.Path, + ) + + assert.True(t, rec.Finished()) + +} + +func TestIncrementalPathReplicator_Replicate(t *testing.T) { + + tbl := []IncrementalPathReplicatorTest{ + { + Msg: "generic happy place with resume token", + Filesystem: replication.Filesystem{ + Path: "foo/bar", + ResumeToken: "blafoo", + }, + Path: fsvlist("@a,1", "@b,2", "@c,3"), + Steps: []IncrementalPathSequenceStep{ + { + SendRequest: replication.SendRequest{ + Filesystem: "foo/bar", + From: "@a,1", + To: "@b,2", + ResumeToken: "blafoo", + }, + }, + { + ReceiveRequest: replication.ReceiveRequest{ + Filesystem: "foo/bar", + ResumeToken: "blafoo", + }, + }, + { + SendRequest: replication.SendRequest{ + Filesystem: "foo/bar", + From: "@b,2", + To: "@c,3", + }, + }, + { + ReceiveRequest: replication.ReceiveRequest{ + Filesystem: "foo/bar", + }, + }, + }, + }, + { + Msg: "no action on empty sequence", + Filesystem: replication.Filesystem{ + Path: "foo/bar", + }, + Path: fsvlist(), + Steps: []IncrementalPathSequenceStep{}, + }, + { + Msg: "no action on invalid path", + Filesystem: replication.Filesystem{ + Path: "foo/bar", + }, + Path: fsvlist("@justone,1"), + Steps: []IncrementalPathSequenceStep{}, + }, + } + + for _, test := range tbl { + test.Test(t) + } + +}