WIP: diffing and replication algorithm

This commit is contained in:
Christian Schwarz 2018-05-02 21:26:11 +02:00
parent 181875a89b
commit 0918ef6815
4 changed files with 746 additions and 0 deletions

116
cmd/replication/diff.go Normal file
View File

@ -0,0 +1,116 @@
package replication
import (
"github.com/zrepl/zrepl/zfs"
"sort"
)
type ConflictNoCommonAncestor struct {
SortedSenderVersions, SortedReceiverVersions []zfs.FilesystemVersion
}
func (c *ConflictNoCommonAncestor) Error() string {
return "no common snapshot or suitable bookmark between sender and receiver"
}
type ConflictDiverged struct {
SortedSenderVersions, SortedReceiverVersions []zfs.FilesystemVersion
CommonAncestor zfs.FilesystemVersion
SenderOnly, ReceiverOnly []zfs.FilesystemVersion
}
func (c *ConflictDiverged) Error() string {
return "the receiver's latest snapshot is not present on sender"
}
func SortVersionListByCreateTXGThenBookmarkLTSnapshot(fsvslice []zfs.FilesystemVersion) []zfs.FilesystemVersion {
lesser := func(s []zfs.FilesystemVersion) func(i, j int) bool {
return func(i, j int) bool {
if s[i].CreateTXG < s[j].CreateTXG {
return true
}
if s[i].CreateTXG == s[j].CreateTXG {
// Bookmark < Snapshot
return s[i].Type == zfs.Bookmark && s[j].Type == zfs.Snapshot
}
return false
}
}
if sort.SliceIsSorted(fsvslice, lesser(fsvslice)) {
return fsvslice
}
sorted := make([]zfs.FilesystemVersion, len(fsvslice))
copy(sorted, fsvslice)
sort.Slice(sorted, lesser(sorted))
return sorted
}
// conflict may be a *ConflictDiverged or a *ConflictNoCommonAncestor
func IncrementalPath(receiver, sender []zfs.FilesystemVersion) (incPath []zfs.FilesystemVersion, conflict error) {
if receiver == nil {
panic("receiver must not be nil")
}
if sender == nil {
panic("sender must not be nil")
}
receiver = SortVersionListByCreateTXGThenBookmarkLTSnapshot(receiver)
sender = SortVersionListByCreateTXGThenBookmarkLTSnapshot(sender)
if len(sender) == 0 {
return []zfs.FilesystemVersion{}, nil
}
// Find most recent common ancestor by name, preferring snapshots over bookmarks
mrcaRcv := len(receiver) - 1
mrcaSnd := len(sender) - 1
for mrcaRcv >= 0 && mrcaSnd >= 0 {
if receiver[mrcaRcv].Guid == sender[mrcaSnd].Guid {
if mrcaSnd-1 >= 0 && sender[mrcaSnd-1].Guid == sender[mrcaSnd].Guid && sender[mrcaSnd-1].Type == zfs.Bookmark {
// prefer bookmarks over snapshots as the snapshot might go away sooner
mrcaSnd -= 1
}
break
}
if receiver[mrcaRcv].CreateTXG < sender[mrcaSnd].CreateTXG {
mrcaSnd--
} else {
mrcaRcv--
}
}
if mrcaRcv == -1 || mrcaSnd == -1 {
return nil, &ConflictNoCommonAncestor{
SortedSenderVersions: sender,
SortedReceiverVersions: receiver,
}
}
if mrcaRcv != len(receiver)-1 {
return nil, &ConflictDiverged{
SortedSenderVersions: sender,
SortedReceiverVersions: receiver,
CommonAncestor: sender[mrcaSnd],
SenderOnly: sender[mrcaSnd+1:],
ReceiverOnly: receiver[mrcaRcv+1:],
}
}
// incPath must not contain bookmarks except initial one,
incPath = make([]zfs.FilesystemVersion, 0, len(sender))
incPath = append(incPath, sender[mrcaSnd])
// it's ok if incPath[0] is a bookmark, but not the subsequent ones in the incPath
for i := mrcaSnd + 1; i < len(sender); i++ {
if sender[i].Type == zfs.Snapshot && incPath[len(incPath)-1].Guid != sender[i].Guid {
incPath = append(incPath, sender[i])
}
}
if len(incPath) == 1 {
// nothing to do
incPath = incPath[1:]
}
return incPath, nil
}

View File

@ -0,0 +1,269 @@
package replication_test
import (
"github.com/stretchr/testify/assert"
"github.com/zrepl/zrepl/cmd/replication"
"github.com/zrepl/zrepl/zfs"
"strconv"
"strings"
"testing"
"time"
)
func fsvlist(fsv ...string) (r []zfs.FilesystemVersion) {
r = make([]zfs.FilesystemVersion, len(fsv))
for i, f := range fsv {
// parse the id from fsvlist. it is used to derivce Guid,CreateTXG and Creation attrs
split := strings.Split(f, ",")
if len(split) != 2 {
panic("invalid fsv spec")
}
id, err := strconv.Atoi(split[1])
if err != nil {
panic(err)
}
if strings.HasPrefix(f, "#") {
r[i] = zfs.FilesystemVersion{
Name: strings.TrimPrefix(f, "#"),
Type: zfs.Bookmark,
Guid: uint64(id),
CreateTXG: uint64(id),
Creation: time.Unix(0, 0).Add(time.Duration(id) * time.Second),
}
} else if strings.HasPrefix(f, "@") {
r[i] = zfs.FilesystemVersion{
Name: strings.TrimPrefix(f, "@"),
Type: zfs.Snapshot,
Guid: uint64(id),
CreateTXG: uint64(id),
Creation: time.Unix(0, 0).Add(time.Duration(id) * time.Second),
}
} else {
panic("invalid character")
}
}
return
}
type incPathResult struct {
incPath []zfs.FilesystemVersion
conflict error
}
type IncrementalPathTest struct {
Msg string
Receiver, Sender []zfs.FilesystemVersion
ExpectIncPath []zfs.FilesystemVersion
ExpectNoCommonAncestor bool
ExpectDiverged *replication.ConflictDiverged
ExpectPanic bool
}
func (tt *IncrementalPathTest) Test(t *testing.T) {
t.Logf("test: %s", tt.Msg)
if tt.ExpectPanic {
assert.Panics(t, func() {
replication.IncrementalPath(tt.Receiver, tt.Sender)
})
return
}
incPath, conflict := replication.IncrementalPath(tt.Receiver, tt.Sender)
if tt.ExpectIncPath != nil {
assert.Nil(t, conflict)
assert.True(t, len(incPath) == 0 || len(incPath) >= 2)
assert.Equal(t, tt.ExpectIncPath, incPath)
return
}
if conflict == nil {
t.Logf("conflict is (unexpectly) <nil>\nincPath: %#v", incPath)
}
if tt.ExpectNoCommonAncestor {
assert.IsType(t, &replication.ConflictNoCommonAncestor{}, conflict)
// TODO check sorting
return
}
if tt.ExpectDiverged != nil {
if !assert.IsType(t, &replication.ConflictDiverged{}, conflict) {
return
}
c := conflict.(*replication.ConflictDiverged)
// TODO check sorting
assert.NotZero(t, c.CommonAncestor)
assert.NotEmpty(t, c.ReceiverOnly)
assert.Equal(t, tt.ExpectDiverged.ReceiverOnly, c.ReceiverOnly)
assert.Equal(t, tt.ExpectDiverged.SenderOnly, c.SenderOnly)
return
}
}
func TestIncrementalPlan_IncrementalSnapshots(t *testing.T) {
l := fsvlist
tbl := []IncrementalPathTest{
{
Msg: "basic functionality",
Receiver: l("@a,1", "@b,2"),
Sender: l("@a,1", "@b,2", "@c,3", "@d,4"),
ExpectIncPath: l("@b,2", "@c,3", "@d,4"),
},
{
Msg: "no snaps on receiver yields no common ancestor",
Receiver: l(),
Sender: l("@a,1"),
ExpectNoCommonAncestor: true,
},
{
Msg: "no snapshots on sender yields empty incremental path",
Receiver: l(),
Sender: l(),
ExpectIncPath: l(),
},
{
Msg: "nothing to do yields empty incremental path",
Receiver: l("@a,1"),
Sender: l("@a,1"),
ExpectIncPath: l(),
},
{
Msg: "drifting apart",
Receiver: l("@a,1", "@b,2"),
Sender: l("@c,3", "@d,4"),
ExpectNoCommonAncestor: true,
},
{
Msg: "different snapshots on sender and receiver",
Receiver: l("@a,1", "@c,2"),
Sender: l("@a,1", "@b,3"),
ExpectDiverged: &replication.ConflictDiverged{
CommonAncestor: l("@a,1")[0],
SenderOnly: l("@b,3"),
ReceiverOnly: l("@c,2"),
},
},
{
Msg: "snapshot on receiver not present on sender",
Receiver: l("@a,1", "@b,2"),
Sender: l("@a,1"),
ExpectDiverged: &replication.ConflictDiverged{
CommonAncestor: l("@a,1")[0],
SenderOnly: l(),
ReceiverOnly: l("@b,2"),
},
},
{
Msg: "gaps before most recent common ancestor do not matter",
Receiver: l("@a,1", "@b,2", "@c,3"),
Sender: l("@a,1", "@c,3", "@d,4"),
ExpectIncPath: l("@c,3", "@d,4"),
},
}
for _, test := range tbl {
test.Test(t)
}
}
func TestIncrementalPlan_BookmarksSupport(t *testing.T) {
l := fsvlist
tbl := []IncrementalPathTest{
{
Msg: "bookmarks are used",
Receiver: l("@a,1"),
Sender: l("#a,1", "@b,2"),
ExpectIncPath: l("#a,1", "@b,2"),
},
{
Msg: "boomarks are stripped from incPath (cannot send incrementally)",
Receiver: l("@a,1"),
Sender: l("#a,1", "#b,2", "@c,3"),
ExpectIncPath: l("#a,1", "@c,3"),
},
{
Msg: "bookmarks are preferred over snapshots for start of incPath",
Receiver: l("@a,1"),
Sender: l("#a,1", "@a,1", "@b,2"),
ExpectIncPath: l("#a,1", "@b,2"),
},
{
Msg: "bookmarks are preferred over snapshots for start of incPath (regardless of order)",
Receiver: l("@a,1"),
Sender: l("@a,1", "#a,1", "@b,2"),
ExpectIncPath: l("#a,1", "@b,2"),
},
}
for _, test := range tbl {
test.Test(t)
}
}
func TestSortVersionListByCreateTXGThenBookmarkLTSnapshot(t *testing.T) {
type Test struct {
Msg string
Input, Output []zfs.FilesystemVersion
}
l := fsvlist
tbl := []Test{
{
"snapshot sorting already sorted",
l("@a,1", "@b,2"),
l("@a,1", "@b,2"),
},
{
"bookmark sorting already sorted",
l("#a,1", "#b,2"),
l("#a,1", "#b,2"),
},
{
"snapshot sorting",
l("@b,2", "@a,1"),
l("@a,1", "@b,2"),
},
{
"bookmark sorting",
l("#b,2", "#a,1"),
l("#a,1", "#b,2"),
},
}
for _, test := range tbl {
t.Logf("test: %s", test.Msg)
inputlen := len(test.Input)
sorted := replication.SortVersionListByCreateTXGThenBookmarkLTSnapshot(test.Input)
if len(sorted) != inputlen {
t.Errorf("lenghts of input and output do not match: %d vs %d", inputlen, len(sorted))
continue
}
if !assert.Equal(t, test.Output, sorted) {
continue
}
last := sorted[0]
for _, s := range sorted[1:] {
if s.CreateTXG < last.CreateTXG {
t.Errorf("must be sorted ascending, got:\n\t%#v", sorted)
break
}
if s.CreateTXG == last.CreateTXG {
if last.Type == zfs.Bookmark && s.Type != zfs.Snapshot {
t.Errorf("snapshots must come after bookmarks")
}
}
last = s
}
}
}

View File

@ -0,0 +1,209 @@
package replication
import (
"context"
"github.com/zrepl/zrepl/zfs"
"io"
)
type ReplicationEndpoint interface {
// Does not include placeholder filesystems
ListFilesystems() ([]Filesystem, error)
ListFilesystemVersions(fs string) ([]zfs.FilesystemVersion, error) // fix depS
Sender
Receiver
}
type Filesystem struct {
Path string
ResumeToken string
}
type FilteredError struct{ fs string }
func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs }
type SendRequest struct {
Filesystem string
From, To string
// If ResumeToken is not empty, the resume token that CAN be tried for 'zfs send' by the sender
// If it does not work, the sender SHOULD clear the resume token on their side
// and use From and To instead
// If ResumeToken is not empty, the GUIDs of From and To
// MUST correspond to those encoded in the ResumeToken.
// Otherwise, the Sender MUST return an error.
ResumeToken string
Compress bool
Dedup bool
}
type SendResponse struct {
Properties zfs.ZFSProperties // fix dep
Stream io.Reader
}
type ReceiveRequest struct {
Filesystem string
// The resume token used by the sending side.
// The receiver MUST discard the saved state on their side if ResumeToken
// does not match the zfs property of Filesystem on their side.
ResumeToken string
}
type ReplicationMode int
const (
ReplicationModePull ReplicationMode = iota
ReplicationModePush
)
type EndpointPair struct {
a, b ReplicationEndpoint
m ReplicationMode
}
func NewEndpointPairPull(sender, receiver ReplicationEndpoint) EndpointPair {
return EndpointPair{sender, receiver, ReplicationModePull}
}
func NewEndpointPairPush(sender, receiver ReplicationEndpoint) EndpointPair {
return EndpointPair{receiver, sender, ReplicationModePush}
}
func (p EndpointPair) Sender() ReplicationEndpoint {
switch p.m {
case ReplicationModePull:
return p.a
case ReplicationModePush:
return p.b
}
panic("should not be reached")
return nil
}
func (p EndpointPair) Receiver() ReplicationEndpoint {
switch p.m {
case ReplicationModePull:
return p.b
case ReplicationModePush:
return p.a
}
panic("should not be reached")
return nil
}
func (p EndpointPair) Mode() ReplicationMode {
return p.m
}
func Replicate(ctx context.Context, ep EndpointPair, ipr IncrementalPathReplicator) {
sfss, err := ep.Sender().ListFilesystems()
if err != nil {
// log error
return
}
for _, fs := range sfss {
sfsvs, err := ep.Sender().ListFilesystemVersions(fs.Path)
rfsvs, err := ep.Receiver().ListFilesystemVersions(fs.Path)
if err != nil {
if _, ok := err.(FilteredError); ok {
// Remote does not map filesystem, don't try to tx it
continue
}
// log and ignore
continue
}
path, conflict := IncrementalPath(rfsvs, sfsvs)
if conflict != nil {
// handle or ignore for now
continue
}
ipr.Replicate(ctx, ep.Sender(), ep.Receiver(), NewCopier(), fs, path)
}
}
type Sender interface {
Send(r SendRequest) (SendResponse, error)
}
type Receiver interface {
Receive(r ReceiveRequest) (io.Writer, error)
}
type Copier interface {
Copy(writer io.Writer, reader io.Reader) (int64, error)
}
type copier struct{}
func (copier) Copy(writer io.Writer, reader io.Reader) (int64, error) {
return io.Copy(writer, reader)
}
func NewCopier() Copier {
return copier{}
}
type IncrementalPathReplicator interface {
Replicate(ctx context.Context, sender Sender, receiver Receiver, copier Copier, fs Filesystem, path []zfs.FilesystemVersion)
}
type incrementalPathReplicator struct{}
func NewIncrementalPathReplicator() IncrementalPathReplicator {
return incrementalPathReplicator{}
}
func (incrementalPathReplicator) Replicate(ctx context.Context, sender Sender, receiver Receiver, copier Copier, fs Filesystem, path []zfs.FilesystemVersion) {
if len(path) == 0 {
// nothing to do
return
}
usedResumeToken := false
incrementalLoop:
for j := 0; j < len(path)-1; j++ {
rt := ""
if !usedResumeToken {
rt = fs.ResumeToken
usedResumeToken = true
}
sr := SendRequest{
Filesystem: fs.Path,
From: path[j].String(),
To: path[j+1].String(),
ResumeToken: rt,
}
sres, err := sender.Send(sr)
if err != nil {
// handle and ignore
break incrementalLoop
}
// try to consume stream
rr := ReceiveRequest{
Filesystem: fs.Path,
ResumeToken: rt,
}
recvWriter, err := receiver.Receive(rr)
if err != nil {
// handle and ignore
break incrementalLoop
}
_, err = copier.Copy(recvWriter, sres.Stream)
if err != nil {
// handle and ignore
break incrementalLoop
}
// handle properties from sres
}
}

View File

@ -0,0 +1,152 @@
package replication_test
import (
"context"
"github.com/stretchr/testify/assert"
"github.com/zrepl/zrepl/cmd/replication"
"github.com/zrepl/zrepl/zfs"
"io"
"testing"
)
type IncrementalPathSequenceStep struct {
SendRequest replication.SendRequest
SendResponse replication.SendResponse
SendError error
ReceiveRequest replication.ReceiveRequest
ReceiveWriter io.Writer
ReceiveError error
}
type MockIncrementalPathRecorder struct {
T *testing.T
Sequence []IncrementalPathSequenceStep
Pos int
}
func (m *MockIncrementalPathRecorder) Receive(r replication.ReceiveRequest) (io.Writer, error) {
if m.Pos >= len(m.Sequence) {
m.T.Fatal("unexpected Receive")
}
i := m.Sequence[m.Pos]
m.Pos++
if !assert.Equal(m.T, i.ReceiveRequest, r) {
m.T.FailNow()
}
return i.ReceiveWriter, i.ReceiveError
}
func (m *MockIncrementalPathRecorder) Send(r replication.SendRequest) (replication.SendResponse, error) {
if m.Pos >= len(m.Sequence) {
m.T.Fatal("unexpected Send")
}
i := m.Sequence[m.Pos]
m.Pos++
if !assert.Equal(m.T, i.SendRequest, r) {
m.T.FailNow()
}
return i.SendResponse, i.SendError
}
func (m *MockIncrementalPathRecorder) Finished() bool {
return m.Pos == len(m.Sequence)
}
type DiscardCopier struct{}
func (DiscardCopier) Copy(writer io.Writer, reader io.Reader) (int64, error) {
return 0, nil
}
type IncrementalPathReplicatorTest struct {
Msg string
Filesystem replication.Filesystem
Path []zfs.FilesystemVersion
Steps []IncrementalPathSequenceStep
}
func (test *IncrementalPathReplicatorTest) Test(t *testing.T) {
t.Log(test.Msg)
rec := &MockIncrementalPathRecorder{
T: t,
Sequence: test.Steps,
}
ipr := replication.NewIncrementalPathReplicator()
ipr.Replicate(
context.TODO(),
rec,
rec,
DiscardCopier{},
test.Filesystem,
test.Path,
)
assert.True(t, rec.Finished())
}
func TestIncrementalPathReplicator_Replicate(t *testing.T) {
tbl := []IncrementalPathReplicatorTest{
{
Msg: "generic happy place with resume token",
Filesystem: replication.Filesystem{
Path: "foo/bar",
ResumeToken: "blafoo",
},
Path: fsvlist("@a,1", "@b,2", "@c,3"),
Steps: []IncrementalPathSequenceStep{
{
SendRequest: replication.SendRequest{
Filesystem: "foo/bar",
From: "@a,1",
To: "@b,2",
ResumeToken: "blafoo",
},
},
{
ReceiveRequest: replication.ReceiveRequest{
Filesystem: "foo/bar",
ResumeToken: "blafoo",
},
},
{
SendRequest: replication.SendRequest{
Filesystem: "foo/bar",
From: "@b,2",
To: "@c,3",
},
},
{
ReceiveRequest: replication.ReceiveRequest{
Filesystem: "foo/bar",
},
},
},
},
{
Msg: "no action on empty sequence",
Filesystem: replication.Filesystem{
Path: "foo/bar",
},
Path: fsvlist(),
Steps: []IncrementalPathSequenceStep{},
},
{
Msg: "no action on invalid path",
Filesystem: replication.Filesystem{
Path: "foo/bar",
},
Path: fsvlist("@justone,1"),
Steps: []IncrementalPathSequenceStep{},
},
}
for _, test := range tbl {
test.Test(t)
}
}