From 38532abf452699cca2efcc32eaaea203bd6b80dd Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 16 Aug 2018 21:05:21 +0200 Subject: [PATCH] enforce encapsulation by breaking up replication package into packages not perfect yet, public shouldn't be required to use 'common' package to use replication package --- Makefile | 9 +- cmd/config_job_local.go | 6 +- cmd/config_job_pull.go | 10 +- cmd/replication.go | 77 +- cmd/replication.v2/diff_test.go | 268 ------- cmd/replication.v2/plan.go | 666 ------------------ cmd/replication.v2/replication.go | 181 ----- cmd/replication.v2/replication_test.go | 181 ----- cmd/replication.v2/report.go | 94 --- cmd/replication/common/common.go | 91 +++ cmd/replication/internal/fsfsm/fsfsm.go | 361 ++++++++++ .../fsfsm}/fsreplicationstate_string.go | 2 +- .../fsfsm}/fsreplicationstepstate_string.go | 2 +- .../internal/mainfsm}/diff.go | 4 +- cmd/replication/internal/mainfsm/mainfsm.go | 342 +++++++++ .../internal/mainfsm/queue/queue.go | 124 ++++ .../mainfsm}/replicationstate_string.go | 2 +- .../pdu}/pdu.pb.go | 92 +-- .../pdu}/pdu.proto | 2 +- .../pdu}/pdu_extras.go | 13 +- .../pdu}/pdu_test.go | 2 +- cmd/replication/replication.go | 19 + 22 files changed, 1057 insertions(+), 1491 deletions(-) delete mode 100644 cmd/replication.v2/diff_test.go delete mode 100644 cmd/replication.v2/plan.go delete mode 100644 cmd/replication.v2/replication.go delete mode 100644 cmd/replication.v2/replication_test.go delete mode 100644 cmd/replication.v2/report.go create mode 100644 cmd/replication/common/common.go create mode 100644 cmd/replication/internal/fsfsm/fsfsm.go rename cmd/{replication.v2 => replication/internal/fsfsm}/fsreplicationstate_string.go (97%) rename cmd/{replication.v2 => replication/internal/fsfsm}/fsreplicationstepstate_string.go (97%) rename cmd/{replication.v2 => replication/internal/mainfsm}/diff.go (98%) create mode 100644 cmd/replication/internal/mainfsm/mainfsm.go create mode 100644 cmd/replication/internal/mainfsm/queue/queue.go rename cmd/{replication.v2 => replication/internal/mainfsm}/replicationstate_string.go (97%) rename cmd/{replication.v2 => replication/pdu}/pdu.pb.go (72%) rename cmd/{replication.v2 => replication/pdu}/pdu.proto (98%) rename cmd/{replication.v2 => replication/pdu}/pdu_extras.go (87%) rename cmd/{replication.v2 => replication/pdu}/pdu_test.go (98%) create mode 100644 cmd/replication/replication.go diff --git a/Makefile b/Makefile index 3970783..5944a64 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,12 @@ .DEFAULT_GOAL := build ROOT := github.com/zrepl/zrepl -SUBPKGS := cmd logger rpc util zfs +SUBPKGS := cmd +SUBPKGS += cmd/replication +SUBPKGS += cmd/replication/internal/common +SUBPKGS += cmd/replication/internal/mainfsm +SUBPKGS += cmd/replication/internal/fsfsm +SUBPKGS += logger util zfs _TESTPKGS := $(ROOT) $(foreach p,$(SUBPKGS),$(ROOT)/$(p)) @@ -26,10 +31,10 @@ vendordeps: dep ensure -v -vendor-only generate: #not part of the build, must do that manually + protoc -I=cmd/replication/pdu --go_out=cmd/replication/pdu cmd/replication/pdu/pdu.proto @for pkg in $(_TESTPKGS); do\ go generate "$$pkg" || exit 1; \ done; - protoc -I=cmd/replication --go_out=cmd/replication cmd/replication/pdu.proto # FIXME fix docker build! diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go index f1902e1..347a9d8 100644 --- a/cmd/config_job_local.go +++ b/cmd/config_job_local.go @@ -8,7 +8,8 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/zfs" "sync" - "github.com/zrepl/zrepl/cmd/replication.v2" + "github.com/zrepl/zrepl/cmd/replication" + "github.com/zrepl/zrepl/cmd/replication/common" ) type LocalJob struct { @@ -146,7 +147,8 @@ outer: j.mainTask.Log().Debug("replicating from lhs to rhs") j.mainTask.Enter("replicate") - replication.Replicate(ctx, replication.NewEndpointPairPull(sender, receiver), nil) // FIXME + rep := replication.NewReplication() + rep.Drive(ctx, common.NewEndpointPairPull(sender, receiver)) j.mainTask.Finish() diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index f3e6c92..bebfbd3 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -13,7 +13,8 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/cmd/replication.v2" + "github.com/zrepl/zrepl/cmd/replication" + "github.com/zrepl/zrepl/cmd/replication/common" ) type PullJob struct { @@ -29,7 +30,7 @@ type PullJob struct { Debug JobDebugSettings task *Task - rep *replication.Replication + rep replication.Replication } func parsePullJob(c JobParsingContext, name string, i map[string]interface{}) (j *PullJob, err error) { @@ -188,13 +189,12 @@ func (j *PullJob) doRun(ctx context.Context) { } - ctx = replication.ContextWithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")}) + ctx = common.ContextWithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")}) ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField("subsystem", "rpc.protocol")}) ctx = context.WithValue(ctx, contextKeyLog, j.task.Log().WithField("subsystem", "rpc.endpoint")) j.rep = replication.NewReplication() - retryNow := make(chan struct{}) - j.rep.Drive(ctx, replication.NewEndpointPairPull(sender, puller), retryNow) + j.rep.Drive(ctx, common.NewEndpointPairPull(sender, puller)) client.Close() j.task.Finish() diff --git a/cmd/replication.go b/cmd/replication.go index b7d1c56..d0f3ed9 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -2,7 +2,8 @@ package cmd import ( "fmt" - "github.com/zrepl/zrepl/cmd/replication.v2" + "github.com/zrepl/zrepl/cmd/replication/common" + "github.com/zrepl/zrepl/cmd/replication/pdu" "github.com/problame/go-streamrpc" "github.com/zrepl/zrepl/zfs" "io" @@ -31,14 +32,14 @@ func NewSenderEndpoint(fsf zfs.DatasetFilter, fsvf zfs.FilesystemVersionFilter) return &SenderEndpoint{fsf, fsvf} } -func (p *SenderEndpoint) ListFilesystems(ctx context.Context) ([]*replication.Filesystem, error) { +func (p *SenderEndpoint) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) { fss, err := zfs.ZFSListMapping(p.FSFilter) if err != nil { return nil, err } - rfss := make([]*replication.Filesystem, len(fss)) + rfss := make([]*pdu.Filesystem, len(fss)) for i := range fss { - rfss[i] = &replication.Filesystem{ + rfss[i] = &pdu.Filesystem{ Path: fss[i].ToString(), // FIXME: not supporting ResumeToken yet } @@ -46,7 +47,7 @@ func (p *SenderEndpoint) ListFilesystems(ctx context.Context) ([]*replication.Fi return rfss, nil } -func (p *SenderEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ([]*replication.FilesystemVersion, error) { +func (p *SenderEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) { dp, err := zfs.NewDatasetPath(fs) if err != nil { return nil, err @@ -56,20 +57,20 @@ func (p *SenderEndpoint) ListFilesystemVersions(ctx context.Context, fs string) return nil, err } if !pass { - return nil, replication.NewFilteredError(fs) + return nil, common.NewFilteredError(fs) } fsvs, err := zfs.ZFSListFilesystemVersions(dp, p.FilesystemVersionFilter) if err != nil { return nil, err } - rfsvs := make([]*replication.FilesystemVersion, len(fsvs)) + rfsvs := make([]*pdu.FilesystemVersion, len(fsvs)) for i := range fsvs { - rfsvs[i] = replication.FilesystemVersionFromZFS(fsvs[i]) + rfsvs[i] = pdu.FilesystemVersionFromZFS(fsvs[i]) } return rfsvs, nil } -func (p *SenderEndpoint) Send(ctx context.Context, r *replication.SendReq) (*replication.SendRes, io.ReadCloser, error) { +func (p *SenderEndpoint) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) { dp, err := zfs.NewDatasetPath(r.Filesystem) if err != nil { return nil, nil, err @@ -79,16 +80,16 @@ func (p *SenderEndpoint) Send(ctx context.Context, r *replication.SendReq) (*rep return nil, nil, err } if !pass { - return nil, nil, replication.NewFilteredError(r.Filesystem) + return nil, nil, common.NewFilteredError(r.Filesystem) } stream, err := zfs.ZFSSend(r.Filesystem, r.From, r.To) if err != nil { return nil, nil, err } - return &replication.SendRes{}, stream, nil + return &pdu.SendRes{}, stream, nil } -func (p *SenderEndpoint) Receive(ctx context.Context, r *replication.ReceiveReq, sendStream io.ReadCloser) (error) { +func (p *SenderEndpoint) Receive(ctx context.Context, r *pdu.ReceiveReq, sendStream io.ReadCloser) (error) { return fmt.Errorf("sender endpoint does not receive") } @@ -108,23 +109,23 @@ func NewReceiverEndpoint(fsmap *DatasetMapFilter, fsvf zfs.FilesystemVersionFilt return &ReceiverEndpoint{fsmapInv, fsmap, fsvf}, nil } -func (e *ReceiverEndpoint) ListFilesystems(ctx context.Context) ([]*replication.Filesystem, error) { +func (e *ReceiverEndpoint) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) { filtered, err := zfs.ZFSListMapping(e.fsmapInv.AsFilter()) if err != nil { return nil, errors.Wrap(err, "error checking client permission") } - fss := make([]*replication.Filesystem, len(filtered)) + fss := make([]*pdu.Filesystem, len(filtered)) for i, a := range filtered { mapped, err := e.fsmapInv.Map(a) if err != nil { return nil, err } - fss[i] = &replication.Filesystem{Path: mapped.ToString()} + fss[i] = &pdu.Filesystem{Path: mapped.ToString()} } return fss, nil } -func (e *ReceiverEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ([]*replication.FilesystemVersion, error) { +func (e *ReceiverEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) { p, err := zfs.NewDatasetPath(fs) if err != nil { return nil, err @@ -142,19 +143,19 @@ func (e *ReceiverEndpoint) ListFilesystemVersions(ctx context.Context, fs string return nil, err } - rfsvs := make([]*replication.FilesystemVersion, len(fsvs)) + rfsvs := make([]*pdu.FilesystemVersion, len(fsvs)) for i := range fsvs { - rfsvs[i] = replication.FilesystemVersionFromZFS(fsvs[i]) + rfsvs[i] = pdu.FilesystemVersionFromZFS(fsvs[i]) } return rfsvs, nil } -func (e *ReceiverEndpoint) Send(ctx context.Context, req *replication.SendReq) (*replication.SendRes, io.ReadCloser, error) { +func (e *ReceiverEndpoint) Send(ctx context.Context, req *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) { return nil, nil, errors.New("receiver endpoint does not send") } -func (e *ReceiverEndpoint) Receive(ctx context.Context, req *replication.ReceiveReq, sendStream io.ReadCloser) error { +func (e *ReceiverEndpoint) Receive(ctx context.Context, req *pdu.ReceiveReq, sendStream io.ReadCloser) error { defer sendStream.Close() p, err := zfs.NewDatasetPath(req.Filesystem) @@ -236,8 +237,8 @@ type RemoteEndpoint struct { *streamrpc.Client } -func (s RemoteEndpoint) ListFilesystems(ctx context.Context) ([]*replication.Filesystem, error) { - req := replication.ListFilesystemReq{} +func (s RemoteEndpoint) ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) { + req := pdu.ListFilesystemReq{} b, err := proto.Marshal(&req) if err != nil { return nil, err @@ -250,15 +251,15 @@ func (s RemoteEndpoint) ListFilesystems(ctx context.Context) ([]*replication.Fil rs.Close() return nil, errors.New("response contains unexpected stream") } - var res replication.ListFilesystemRes + var res pdu.ListFilesystemRes if err := proto.Unmarshal(rb.Bytes(), &res); err != nil { return nil, err } return res.Filesystems, nil } -func (s RemoteEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ([]*replication.FilesystemVersion, error) { - req := replication.ListFilesystemVersionsReq{ +func (s RemoteEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) { + req := pdu.ListFilesystemVersionsReq{ Filesystem: fs, } b, err := proto.Marshal(&req) @@ -273,14 +274,14 @@ func (s RemoteEndpoint) ListFilesystemVersions(ctx context.Context, fs string) ( rs.Close() return nil, errors.New("response contains unexpected stream") } - var res replication.ListFilesystemVersionsRes + var res pdu.ListFilesystemVersionsRes if err := proto.Unmarshal(rb.Bytes(), &res); err != nil { return nil, err } return res.Versions, nil } -func (s RemoteEndpoint) Send(ctx context.Context, r *replication.SendReq) (*replication.SendRes, io.ReadCloser, error) { +func (s RemoteEndpoint) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) { b, err := proto.Marshal(r) if err != nil { return nil, nil, err @@ -292,7 +293,7 @@ func (s RemoteEndpoint) Send(ctx context.Context, r *replication.SendReq) (*repl if rs == nil { return nil, nil, errors.New("response does not contain a stream") } - var res replication.SendRes + var res pdu.SendRes if err := proto.Unmarshal(rb.Bytes(), &res); err != nil { rs.Close() return nil, nil, err @@ -301,7 +302,7 @@ func (s RemoteEndpoint) Send(ctx context.Context, r *replication.SendReq) (*repl return &res, rs, nil } -func (s RemoteEndpoint) Receive(ctx context.Context, r *replication.ReceiveReq, sendStream io.ReadCloser) (error) { +func (s RemoteEndpoint) Receive(ctx context.Context, r *pdu.ReceiveReq, sendStream io.ReadCloser) (error) { defer sendStream.Close() b, err := proto.Marshal(r) if err != nil { @@ -315,7 +316,7 @@ func (s RemoteEndpoint) Receive(ctx context.Context, r *replication.ReceiveReq, rs.Close() return errors.New("response contains unexpected stream") } - var res replication.ReceiveRes + var res pdu.ReceiveRes if err := proto.Unmarshal(rb.Bytes(), &res); err != nil { return err } @@ -323,14 +324,14 @@ func (s RemoteEndpoint) Receive(ctx context.Context, r *replication.ReceiveReq, } type HandlerAdaptor struct { - ep replication.ReplicationEndpoint + ep common.ReplicationEndpoint } func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructured *bytes.Buffer, reqStream io.ReadCloser) (resStructured *bytes.Buffer, resStream io.ReadCloser, err error) { switch endpoint { case RPCListFilesystems: - var req replication.ListFilesystemReq + var req pdu.ListFilesystemReq if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil { return nil, nil, err } @@ -338,7 +339,7 @@ func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructu if err != nil { return nil, nil, err } - res := &replication.ListFilesystemRes{ + res := &pdu.ListFilesystemRes{ Filesystems: fsses, } b, err := proto.Marshal(res) @@ -349,7 +350,7 @@ func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructu case RPCListFilesystemVersions: - var req replication.ListFilesystemVersionsReq + var req pdu.ListFilesystemVersionsReq if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil { return nil, nil, err } @@ -357,7 +358,7 @@ func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructu if err != nil { return nil, nil, err } - res := &replication.ListFilesystemVersionsRes{ + res := &pdu.ListFilesystemVersionsRes{ Versions: fsvs, } b, err := proto.Marshal(res) @@ -368,7 +369,7 @@ func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructu case RPCSend: - var req replication.SendReq + var req pdu.SendReq if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil { return nil, nil, err } @@ -384,7 +385,7 @@ func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructu case RPCReceive: - var req replication.ReceiveReq + var req pdu.ReceiveReq if err := proto.Unmarshal(reqStructured.Bytes(), &req); err != nil { return nil, nil, err } @@ -392,7 +393,7 @@ func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructu if err != nil { return nil, nil, err } - b, err := proto.Marshal(&replication.ReceiveRes{}) + b, err := proto.Marshal(&pdu.ReceiveRes{}) if err != nil { return nil, nil, err } diff --git a/cmd/replication.v2/diff_test.go b/cmd/replication.v2/diff_test.go deleted file mode 100644 index cf9b771..0000000 --- a/cmd/replication.v2/diff_test.go +++ /dev/null @@ -1,268 +0,0 @@ -package replication_test - -import ( - "github.com/stretchr/testify/assert" - "github.com/zrepl/zrepl/cmd/replication" - "strconv" - "strings" - "testing" - "time" -) - -func fsvlist(fsv ...string) (r []*replication.FilesystemVersion) { - - r = make([]*replication.FilesystemVersion, len(fsv)) - for i, f := range fsv { - - // parse the id from fsvlist. it is used to derivce Guid,CreateTXG and Creation attrs - split := strings.Split(f, ",") - if len(split) != 2 { - panic("invalid fsv spec") - } - id, err := strconv.Atoi(split[1]) - if err != nil { - panic(err) - } - - if strings.HasPrefix(f, "#") { - r[i] = &replication.FilesystemVersion{ - Name: strings.TrimPrefix(f, "#"), - Type: replication.FilesystemVersion_Bookmark, - Guid: uint64(id), - CreateTXG: uint64(id), - Creation: time.Unix(0, 0).Add(time.Duration(id) * time.Second).Format(time.RFC3339), - } - } else if strings.HasPrefix(f, "@") { - r[i] = &replication.FilesystemVersion{ - Name: strings.TrimPrefix(f, "@"), - Type: replication.FilesystemVersion_Snapshot, - Guid: uint64(id), - CreateTXG: uint64(id), - Creation: time.Unix(0, 0).Add(time.Duration(id) * time.Second).Format(time.RFC3339), - } - } else { - panic("invalid character") - } - } - return -} - -type incPathResult struct { - incPath []*replication.FilesystemVersion - conflict error -} - -type IncrementalPathTest struct { - Msg string - Receiver, Sender []*replication.FilesystemVersion - ExpectIncPath []*replication.FilesystemVersion - ExpectNoCommonAncestor bool - ExpectDiverged *replication.ConflictDiverged - ExpectPanic bool -} - -func (tt *IncrementalPathTest) Test(t *testing.T) { - - t.Logf("test: %s", tt.Msg) - - if tt.ExpectPanic { - assert.Panics(t, func() { - replication.IncrementalPath(tt.Receiver, tt.Sender) - }) - return - } - - incPath, conflict := replication.IncrementalPath(tt.Receiver, tt.Sender) - - if tt.ExpectIncPath != nil { - assert.Nil(t, conflict) - assert.True(t, len(incPath) == 0 || len(incPath) >= 2) - assert.Equal(t, tt.ExpectIncPath, incPath) - return - } - if conflict == nil { - t.Logf("conflict is (unexpectly) \nincPath: %#v", incPath) - } - if tt.ExpectNoCommonAncestor { - assert.IsType(t, &replication.ConflictNoCommonAncestor{}, conflict) - // TODO check sorting - return - } - if tt.ExpectDiverged != nil { - if !assert.IsType(t, &replication.ConflictDiverged{}, conflict) { - return - } - c := conflict.(*replication.ConflictDiverged) - // TODO check sorting - assert.NotZero(t, c.CommonAncestor) - assert.NotEmpty(t, c.ReceiverOnly) - assert.Equal(t, tt.ExpectDiverged.ReceiverOnly, c.ReceiverOnly) - assert.Equal(t, tt.ExpectDiverged.SenderOnly, c.SenderOnly) - return - } - -} - -func TestIncrementalPlan_IncrementalSnapshots(t *testing.T) { - l := fsvlist - - tbl := []IncrementalPathTest{ - { - Msg: "basic functionality", - Receiver: l("@a,1", "@b,2"), - Sender: l("@a,1", "@b,2", "@c,3", "@d,4"), - ExpectIncPath: l("@b,2", "@c,3", "@d,4"), - }, - { - Msg: "no snaps on receiver yields no common ancestor", - Receiver: l(), - Sender: l("@a,1"), - ExpectNoCommonAncestor: true, - }, - { - Msg: "no snapshots on sender yields empty incremental path", - Receiver: l(), - Sender: l(), - ExpectIncPath: l(), - }, - { - Msg: "nothing to do yields empty incremental path", - Receiver: l("@a,1"), - Sender: l("@a,1"), - ExpectIncPath: l(), - }, - { - Msg: "drifting apart", - Receiver: l("@a,1", "@b,2"), - Sender: l("@c,3", "@d,4"), - ExpectNoCommonAncestor: true, - }, - { - Msg: "different snapshots on sender and receiver", - Receiver: l("@a,1", "@c,2"), - Sender: l("@a,1", "@b,3"), - ExpectDiverged: &replication.ConflictDiverged{ - CommonAncestor: l("@a,1")[0], - SenderOnly: l("@b,3"), - ReceiverOnly: l("@c,2"), - }, - }, - { - Msg: "snapshot on receiver not present on sender", - Receiver: l("@a,1", "@b,2"), - Sender: l("@a,1"), - ExpectDiverged: &replication.ConflictDiverged{ - CommonAncestor: l("@a,1")[0], - SenderOnly: l(), - ReceiverOnly: l("@b,2"), - }, - }, - { - Msg: "gaps before most recent common ancestor do not matter", - Receiver: l("@a,1", "@b,2", "@c,3"), - Sender: l("@a,1", "@c,3", "@d,4"), - ExpectIncPath: l("@c,3", "@d,4"), - }, - } - - for _, test := range tbl { - test.Test(t) - } - -} - -func TestIncrementalPlan_BookmarksSupport(t *testing.T) { - l := fsvlist - - tbl := []IncrementalPathTest{ - { - Msg: "bookmarks are used", - Receiver: l("@a,1"), - Sender: l("#a,1", "@b,2"), - ExpectIncPath: l("#a,1", "@b,2"), - }, - { - Msg: "boomarks are stripped from incPath (cannot send incrementally)", - Receiver: l("@a,1"), - Sender: l("#a,1", "#b,2", "@c,3"), - ExpectIncPath: l("#a,1", "@c,3"), - }, - { - Msg: "bookmarks are preferred over snapshots for start of incPath", - Receiver: l("@a,1"), - Sender: l("#a,1", "@a,1", "@b,2"), - ExpectIncPath: l("#a,1", "@b,2"), - }, - { - Msg: "bookmarks are preferred over snapshots for start of incPath (regardless of order)", - Receiver: l("@a,1"), - Sender: l("@a,1", "#a,1", "@b,2"), - ExpectIncPath: l("#a,1", "@b,2"), - }, - } - - for _, test := range tbl { - test.Test(t) - } - -} - -func TestSortVersionListByCreateTXGThenBookmarkLTSnapshot(t *testing.T) { - - type Test struct { - Msg string - Input, Output []*replication.FilesystemVersion - } - - l := fsvlist - - tbl := []Test{ - { - "snapshot sorting already sorted", - l("@a,1", "@b,2"), - l("@a,1", "@b,2"), - }, - { - "bookmark sorting already sorted", - l("#a,1", "#b,2"), - l("#a,1", "#b,2"), - }, - { - "snapshot sorting", - l("@b,2", "@a,1"), - l("@a,1", "@b,2"), - }, - { - "bookmark sorting", - l("#b,2", "#a,1"), - l("#a,1", "#b,2"), - }, - } - - for _, test := range tbl { - t.Logf("test: %s", test.Msg) - inputlen := len(test.Input) - sorted := replication.SortVersionListByCreateTXGThenBookmarkLTSnapshot(test.Input) - if len(sorted) != inputlen { - t.Errorf("lenghts of input and output do not match: %d vs %d", inputlen, len(sorted)) - continue - } - if !assert.Equal(t, test.Output, sorted) { - continue - } - last := sorted[0] - for _, s := range sorted[1:] { - if s.CreateTXG < last.CreateTXG { - t.Errorf("must be sorted ascending, got:\n\t%#v", sorted) - break - } - if s.CreateTXG == last.CreateTXG { - if last.Type == replication.FilesystemVersion_Bookmark && s.Type != replication.FilesystemVersion_Snapshot { - t.Errorf("snapshots must come after bookmarks") - } - } - last = s - } - } - -} diff --git a/cmd/replication.v2/plan.go b/cmd/replication.v2/plan.go deleted file mode 100644 index b0045ec..0000000 --- a/cmd/replication.v2/plan.go +++ /dev/null @@ -1,666 +0,0 @@ -package replication - -import ( - "context" - "errors" - "fmt" - "io" - "math/bits" - "net" - "sort" - "sync" - "time" -) - -//go:generate stringer -type=ReplicationState -type ReplicationState uint - -const ( - Planning ReplicationState = 1 << iota - PlanningError - Working - WorkingWait - Completed - ContextDone -) - -func (s ReplicationState) rsf() replicationStateFunc { - idx := bits.TrailingZeros(uint(s)) - if idx == bits.UintSize { - panic(s) // invalid value - } - m := []replicationStateFunc{ - rsfPlanning, - rsfPlanningError, - rsfWorking, - rsfWorkingWait, - nil, - nil, - } - return m[idx] -} - -type replicationQueueItem struct { - retriesSinceLastError int - // duplicates fsr.state to avoid accessing and locking fsr - state FSReplicationState - // duplicates fsr.current.nextStepDate to avoid accessing & locking fsr - nextStepDate time.Time - - fsr *FSReplication -} - -type Replication struct { - // lock protects all fields of this struct (but not the fields behind pointers!) - lock sync.Mutex - - state ReplicationState - - // Working, WorkingWait, Completed, ContextDone - queue *replicationQueue - completed []*FSReplication - active *FSReplication - - // PlanningError - planningError error - - // ContextDone - contextError error - - // PlanningError, WorkingWait - sleepUntil time.Time -} - -type replicationUpdater func(func(*Replication)) (newState ReplicationState) -type replicationStateFunc func(context.Context, EndpointPair, replicationUpdater) replicationStateFunc - -//go:generate stringer -type=FSReplicationState -type FSReplicationState uint - -const ( - FSReady FSReplicationState = 1 << iota - FSRetryWait - FSPermanentError - FSCompleted -) - -func (s FSReplicationState) fsrsf() fsrsf { - idx := bits.TrailingZeros(uint(s)) - if idx == bits.UintSize { - panic(s) - } - m := []fsrsf{ - fsrsfReady, - fsrsfRetryWait, - nil, - nil, - } - return m[idx] -} - -type FSReplication struct { - // lock protects all fields in this struct, but not the data behind pointers - lock sync.Mutex - state FSReplicationState - fs *Filesystem - err error - retryWaitUntil time.Time - completed, pending []*FSReplicationStep - current *FSReplicationStep -} - -func newReplicationQueueItemPermanentError(fs *Filesystem, err error) *replicationQueueItem { - return &replicationQueueItem{ - retriesSinceLastError: 0, - state: FSPermanentError, - fsr: &FSReplication{ - state: FSPermanentError, - fs: fs, - err: err, - }, - } -} - -func (b *replicationQueueItemBuilder) Complete() *replicationQueueItem { - if len(b.r.pending) > 0 { - b.r.state = FSReady - } else { - b.r.state = FSCompleted - } - r := b.r - return &replicationQueueItem{0, b.r.state, time.Time{}, r} -} - -//go:generate stringer -type=FSReplicationStepState -type FSReplicationStepState uint - -const ( - StepReady FSReplicationStepState = 1 << iota - StepRetry - StepPermanentError - StepCompleted -) - -type FSReplicationStep struct { - // only protects state, err - // from, to and fsrep are assumed to be immutable - lock sync.Mutex - - state FSReplicationStepState - from, to *FilesystemVersion - fsrep *FSReplication - - // both retry and permanent error - err error -} - -func (r *Replication) Drive(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { - - var u replicationUpdater = func(f func(*Replication)) ReplicationState { - r.lock.Lock() - defer r.lock.Unlock() - if f != nil { - f(r) - } - return r.state - } - - var s replicationStateFunc = rsfPlanning - var pre, post ReplicationState - for s != nil { - preTime := time.Now() - pre = u(nil) - s = s(ctx, ep, u) - delta := time.Now().Sub(preTime) - post = u(nil) - getLogger(ctx). - WithField("transition", fmt.Sprintf("%s => %s", pre, post)). - WithField("duration", delta). - Debug("main state transition") - } - - getLogger(ctx). - WithField("final_state", post). - Debug("main final state") -} - -func rsfPlanning(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { - - log := getLogger(ctx) - - handlePlanningError := func(err error) replicationStateFunc { - return u(func(r *Replication) { - r.planningError = err - r.state = PlanningError - }).rsf() - } - - sfss, err := ep.Sender().ListFilesystems(ctx) - if err != nil { - log.WithError(err).Error("error listing sender filesystems") - return handlePlanningError(err) - } - - rfss, err := ep.Receiver().ListFilesystems(ctx) - if err != nil { - log.WithError(err).Error("error listing receiver filesystems") - return handlePlanningError(err) - } - - q := newReplicationQueue() - mainlog := log - for _, fs := range sfss { - - log := mainlog.WithField("filesystem", fs.Path) - - log.Info("assessing filesystem") - - sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path) - if err != nil { - log.WithError(err).Error("cannot get remote filesystem versions") - return handlePlanningError(err) - } - - if len(sfsvs) <= 1 { - err := errors.New("sender does not have any versions") - log.Error(err.Error()) - q.Add(newReplicationQueueItemPermanentError(fs, err)) - continue - } - - receiverFSExists := false - for _, rfs := range rfss { - if rfs.Path == fs.Path { - receiverFSExists = true - } - } - - var rfsvs []*FilesystemVersion - if receiverFSExists { - rfsvs, err = ep.Receiver().ListFilesystemVersions(ctx, fs.Path) - if err != nil { - if _, ok := err.(FilteredError); ok { - log.Info("receiver ignores filesystem") - continue - } - log.WithError(err).Error("receiver error") - return handlePlanningError(err) - } - } else { - rfsvs = []*FilesystemVersion{} - } - - path, conflict := IncrementalPath(rfsvs, sfsvs) - if conflict != nil { - var msg string - path, msg = resolveConflict(conflict) // no shadowing allowed! - if path != nil { - log.WithField("conflict", conflict).Info("conflict") - log.WithField("resolution", msg).Info("automatically resolved") - } else { - log.WithField("conflict", conflict).Error("conflict") - log.WithField("problem", msg).Error("cannot resolve conflict") - } - } - if path == nil { - q.Add(newReplicationQueueItemPermanentError(fs, conflict)) - continue - } - - builder := buildReplicationQueueItem(fs) - if len(path) == 1 { - builder.AddStep(nil, path[0]) - } else { - for i := 0; i < len(path)-1; i++ { - builder.AddStep(path[i], path[i+1]) - } - } - qitem := builder.Complete() - q.Add(qitem) - } - - return u(func(r *Replication) { - r.completed = nil - r.queue = q - r.planningError = nil - r.state = Working - }).rsf() -} - -func rsfPlanningError(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { - sleepTime := 10 * time.Second - u(func(r *Replication) { - r.sleepUntil = time.Now().Add(sleepTime) - }) - t := time.NewTimer(sleepTime) // FIXME make constant onfigurable - defer t.Stop() - select { - case <-ctx.Done(): - return u(func(r *Replication) { - r.state = ContextDone - r.contextError = ctx.Err() - }).rsf() - case <-t.C: - return u(func(r *Replication) { - r.state = Planning - }).rsf() - } -} - -func rsfWorking(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { - - var active *replicationQueueItemHandle - - rsfNext := u(func(r *Replication) { - done, next := r.queue.GetNext() - r.completed = append(r.completed, done...) - if next == nil { - r.state = Completed - } - active = next - }).rsf() - - if active == nil { - return rsfNext - } - - state, nextStepDate := active.GetFSReplication().takeStep(ctx, ep) - - return u(func(r *Replication) { - active.Update(state, nextStepDate) - }).rsf() -} - -func rsfWorkingWait(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { - sleepTime := 10 * time.Second - u(func(r *Replication) { - r.sleepUntil = time.Now().Add(sleepTime) - }) - t := time.NewTimer(sleepTime) - defer t.Stop() - select { - case <-ctx.Done(): - return u(func(r *Replication) { - r.state = ContextDone - r.contextError = ctx.Err() - }).rsf() - case <-t.C: - return u(func(r *Replication) { - r.state = Working - }).rsf() - } -} - -type replicationQueueItemBuilder struct { - r *FSReplication - steps []*FSReplicationStep -} - -func buildReplicationQueueItem(fs *Filesystem) *replicationQueueItemBuilder { - return &replicationQueueItemBuilder{ - r: &FSReplication{ - fs: fs, - pending: make([]*FSReplicationStep, 0), - }, - } -} - -func (b *replicationQueueItemBuilder) AddStep(from, to *FilesystemVersion) *replicationQueueItemBuilder { - step := &FSReplicationStep{ - state: StepReady, - fsrep: b.r, - from: from, - to: to, - } - b.r.pending = append(b.r.pending, step) - return b -} - -type replicationQueue []*replicationQueueItem - -func newReplicationQueue() *replicationQueue { - q := make(replicationQueue, 0) - return &q -} - -func (q replicationQueue) Len() int { return len(q) } -func (q replicationQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } -func (q replicationQueue) Less(i, j int) bool { - a, b := q[i], q[j] - statePrio := func(x *replicationQueueItem) int { - if x.state&(FSReady|FSRetryWait) == 0 { - panic(x) - } - if x.state == FSReady { - return 0 - } - return 1 - } - - aprio, bprio := statePrio(a), statePrio(b) - if aprio != bprio { - return aprio < bprio - } - // now we know they are the same state - if a.state == FSReady { - return a.nextStepDate.Before(b.nextStepDate) - } - if a.state == FSRetryWait { - return a.retriesSinceLastError < b.retriesSinceLastError - } - panic("should not be reached") -} - -func (q *replicationQueue) sort() (done []*FSReplication) { - // pre-scan for everything that is not ready - newq := make(replicationQueue, 0, len(*q)) - done = make([]*FSReplication, 0, len(*q)) - for _, qitem := range *q { - if qitem.state&(FSReady|FSRetryWait) == 0 { - done = append(done, qitem.fsr) - } else { - newq = append(newq, qitem) - } - } - sort.SortStable(newq) // stable to avoid flickering in reports - *q = newq - return done -} - -// next remains valid until the next call to GetNext() -func (q *replicationQueue) GetNext() (done []*FSReplication, next *replicationQueueItemHandle) { - done = q.sort() - if len(*q) == 0 { - return done, nil - } - next = &replicationQueueItemHandle{(*q)[0]} - return done, next -} - -func (q *replicationQueue) Add(qitem *replicationQueueItem) { - *q = append(*q, qitem) -} - -func (q *replicationQueue) Foreach(fu func(*replicationQueueItemHandle)) { - for _, qitem := range *q { - fu(&replicationQueueItemHandle{qitem}) - } -} - -type replicationQueueItemHandle struct { - i *replicationQueueItem -} - -func (h replicationQueueItemHandle) GetFSReplication() *FSReplication { - return h.i.fsr -} - -func (h replicationQueueItemHandle) Update(newState FSReplicationState, nextStepDate time.Time) { - h.i.state = newState - h.i.nextStepDate = nextStepDate - if h.i.state&FSReady != 0 { - h.i.retriesSinceLastError = 0 - } else if h.i.state&FSRetryWait != 0 { - h.i.retriesSinceLastError++ - } -} - -func (f *FSReplication) takeStep(ctx context.Context, ep EndpointPair) (post FSReplicationState, nextStepDate time.Time) { - - var u fsrUpdater = func(fu func(*FSReplication)) FSReplicationState { - f.lock.Lock() - defer f.lock.Unlock() - if fu != nil { - fu(f) - } - return f.state - } - var s fsrsf = u(nil).fsrsf() - - pre := u(nil) - preTime := time.Now() - s = s(ctx, ep, u) - delta := time.Now().Sub(preTime) - post = u(func(f *FSReplication) { - if f.state != FSReady { - return - } - ct, err := f.current.to.CreationAsTime() - if err != nil { - panic(err) // FIXME - } - nextStepDate = ct - }) - - getLogger(ctx). - WithField("fs", f.fs.Path). - WithField("transition", fmt.Sprintf("%s => %s", pre, post)). - WithField("duration", delta). - Debug("fsr step taken") - - return post, nextStepDate -} - -type fsrUpdater func(func(fsr *FSReplication)) FSReplicationState -type fsrsf func(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf - -func fsrsfReady(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf { - - var current *FSReplicationStep - s := u(func(f *FSReplication) { - if f.current == nil { - if len(f.pending) == 0 { - f.state = FSCompleted - return - } - f.current = f.pending[0] - f.pending = f.pending[1:] - } - current = f.current - }) - if s != FSReady { - return s.fsrsf() - } - - stepState := current.do(ctx, ep) - - return u(func(f *FSReplication) { - switch stepState { - case StepCompleted: - f.completed = append(f.completed, f.current) - f.current = nil - if len(f.pending) > 0 { - f.state = FSReady - } else { - f.state = FSCompleted - } - case StepRetry: - f.retryWaitUntil = time.Now().Add(10 * time.Second) // FIXME make configurable - f.state = FSRetryWait - case StepPermanentError: - f.state = FSPermanentError - f.err = errors.New("a replication step failed with a permanent error") - default: - panic(f) - } - }).fsrsf() -} - -func fsrsfRetryWait(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf { - var sleepUntil time.Time - u(func(f *FSReplication) { - sleepUntil = f.retryWaitUntil - }) - t := time.NewTimer(sleepUntil.Sub(time.Now())) - defer t.Stop() - select { - case <-ctx.Done(): - return u(func(f *FSReplication) { - f.state = FSPermanentError - f.err = ctx.Err() - }).fsrsf() - case <-t.C: - } - return u(func(f *FSReplication) { - f.state = FSReady - }).fsrsf() -} - -func (s *FSReplicationStep) do(ctx context.Context, ep EndpointPair) FSReplicationStepState { - - fs := s.fsrep.fs - - log := getLogger(ctx). - WithField("filesystem", fs.Path). - WithField("step", s.String()) - - updateStateError := func(err error) FSReplicationStepState { - s.lock.Lock() - defer s.lock.Unlock() - - s.err = err - switch err { - case io.EOF: - fallthrough - case io.ErrUnexpectedEOF: - fallthrough - case io.ErrClosedPipe: - s.state = StepRetry - return s.state - } - if _, ok := err.(net.Error); ok { - s.state = StepRetry - return s.state - } - s.state = StepPermanentError - return s.state - } - - updateStateCompleted := func() FSReplicationStepState { - s.lock.Lock() - defer s.lock.Unlock() - s.err = nil - s.state = StepCompleted - return s.state - } - - // FIXME refresh fs resume token - fs.ResumeToken = "" - - var sr *SendReq - if fs.ResumeToken != "" { - sr = &SendReq{ - Filesystem: fs.Path, - ResumeToken: fs.ResumeToken, - } - } else if s.from == nil { - sr = &SendReq{ - Filesystem: fs.Path, - From: s.to.RelName(), // FIXME fix protocol to use To, like zfs does internally - } - } else { - sr = &SendReq{ - Filesystem: fs.Path, - From: s.from.RelName(), - To: s.to.RelName(), - } - } - - log.WithField("request", sr).Debug("initiate send request") - sres, sstream, err := ep.Sender().Send(ctx, sr) - if err != nil { - log.WithError(err).Error("send request failed") - return updateStateError(err) - } - if sstream == nil { - err := errors.New("send request did not return a stream, broken endpoint implementation") - return updateStateError(err) - } - - rr := &ReceiveReq{ - Filesystem: fs.Path, - ClearResumeToken: !sres.UsedResumeToken, - } - log.WithField("request", rr).Debug("initiate receive request") - err = ep.Receiver().Receive(ctx, rr, sstream) - if err != nil { - log.WithError(err).Error("receive request failed (might also be error on sender)") - sstream.Close() - // This failure could be due to - // - an unexpected exit of ZFS on the sending side - // - an unexpected exit of ZFS on the receiving side - // - a connectivity issue - return updateStateError(err) - } - log.Info("receive finished") - return updateStateCompleted() - -} - -func (s *FSReplicationStep) String() string { - if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send - return fmt.Sprintf("%s%s (full)", s.fsrep.fs.Path, s.to.RelName()) - } else { - return fmt.Sprintf("%s(%s => %s)", s.fsrep.fs.Path, s.from.RelName(), s.to.RelName()) - } -} diff --git a/cmd/replication.v2/replication.go b/cmd/replication.v2/replication.go deleted file mode 100644 index 83fd38b..0000000 --- a/cmd/replication.v2/replication.go +++ /dev/null @@ -1,181 +0,0 @@ -package replication - -import ( - "os" - "syscall" - "encoding/json" - "context" - "fmt" - "github.com/zrepl/zrepl/logger" - "io" - "os/signal" -) - -type ReplicationEndpoint interface { - // Does not include placeholder filesystems - ListFilesystems(ctx context.Context) ([]*Filesystem, error) - ListFilesystemVersions(ctx context.Context, fs string) ([]*FilesystemVersion, error) // fix depS - Send(ctx context.Context, r *SendReq) (*SendRes, io.ReadCloser, error) - Receive(ctx context.Context, r *ReceiveReq, sendStream io.ReadCloser) error -} - -type FilteredError struct{ fs string } - -func NewFilteredError(fs string) FilteredError { - return FilteredError{fs} -} - -func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs } - -type ReplicationMode int - -const ( - ReplicationModePull ReplicationMode = iota - ReplicationModePush -) - -type EndpointPair struct { - a, b ReplicationEndpoint - m ReplicationMode -} - -func NewEndpointPairPull(sender, receiver ReplicationEndpoint) EndpointPair { - return EndpointPair{sender, receiver, ReplicationModePull} -} - -func NewEndpointPairPush(sender, receiver ReplicationEndpoint) EndpointPair { - return EndpointPair{receiver, sender, ReplicationModePush} -} - -func (p EndpointPair) Sender() ReplicationEndpoint { - switch p.m { - case ReplicationModePull: - return p.a - case ReplicationModePush: - return p.b - } - panic("should not be reached") - return nil -} - -func (p EndpointPair) Receiver() ReplicationEndpoint { - switch p.m { - case ReplicationModePull: - return p.b - case ReplicationModePush: - return p.a - } - panic("should not be reached") - return nil -} - -func (p EndpointPair) Mode() ReplicationMode { - return p.m -} - -type contextKey int - -const ( - contextKeyLog contextKey = iota -) - -//type Logger interface { -// Infof(fmt string, args ...interface{}) -// Errorf(fmt string, args ...interface{}) -//} - -//var _ Logger = nullLogger{} - -//type nullLogger struct{} -// -//func (nullLogger) Infof(fmt string, args ...interface{}) {} -//func (nullLogger) Errorf(fmt string, args ...interface{}) {} - -type Logger = logger.Logger - -func ContextWithLogger(ctx context.Context, l Logger) context.Context { - return context.WithValue(ctx, contextKeyLog, l) -} - -func getLogger(ctx context.Context) Logger { - l, ok := ctx.Value(contextKeyLog).(Logger) - if !ok { - l = logger.NewNullLogger() - } - return l -} - -func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) { - if noCommonAncestor, ok := conflict.(*ConflictNoCommonAncestor); ok { - if len(noCommonAncestor.SortedReceiverVersions) == 0 { - // FIXME hard-coded replication policy: most recent - // snapshot as source - var mostRecentSnap *FilesystemVersion - for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- { - if noCommonAncestor.SortedSenderVersions[n].Type == FilesystemVersion_Snapshot { - mostRecentSnap = noCommonAncestor.SortedSenderVersions[n] - break - } - } - if mostRecentSnap == nil { - return nil, "no snapshots available on sender side" - } - return []*FilesystemVersion{mostRecentSnap}, fmt.Sprintf("start replication at most recent snapshot %s", mostRecentSnap.RelName()) - } - } - return nil, "no automated way to handle conflict type" -} - -func NewReplication() *Replication { - r := Replication{ - state: Planning, - } - return &r -} -// Replicate replicates filesystems from ep.Sender() to ep.Receiver(). -// -// All filesystems presented by the sending side are replicated, -// unless the receiver rejects a Receive request with a *FilteredError. -// -// If an error occurs when replicating a filesystem, that error is logged to the logger in ctx. -// Replicate continues with the replication of the remaining file systems. -// Depending on the type of error, failed replications are retried in an unspecified order (currently FIFO). -func Replicate(ctx context.Context, ep EndpointPair, retryNow chan struct{}) { - r := Replication{ - state: Planning, - } - - c := make(chan os.Signal) - defer close(c) - signal.Notify(c, syscall.SIGHUP) - go func() { - f, err := os.OpenFile("/tmp/report", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) - if err != nil { - getLogger(ctx).WithError(err).Error("cannot open report file") - panic(err) - } - defer f.Close() - for { - select { - case <-ctx.Done(): - return - case sig := <-c: - if sig == nil { - return - } - report := r.Report() - enc := json.NewEncoder(f) - enc.SetIndent(" ", " ") - if err := enc.Encode(report); err != nil { - getLogger(ctx).WithError(err).Error("cannot encode report") - panic(err) - } - f.Write([]byte("\n")) - f.Sync() - } - } - }() - - r.Drive(ctx, ep, retryNow) -} - diff --git a/cmd/replication.v2/replication_test.go b/cmd/replication.v2/replication_test.go deleted file mode 100644 index 00b3868..0000000 --- a/cmd/replication.v2/replication_test.go +++ /dev/null @@ -1,181 +0,0 @@ -package replication_test - -import ( - "context" - "github.com/stretchr/testify/assert" - "github.com/zrepl/zrepl/cmd/replication" - "io" - "testing" -) - -type IncrementalPathSequenceStep struct { - SendRequest *replication.SendReq - SendResponse *replication.SendRes - SendReader io.ReadCloser - SendError error - ReceiveRequest *replication.ReceiveReq - ReceiveError error -} - -type MockIncrementalPathRecorder struct { - T *testing.T - Sequence []IncrementalPathSequenceStep - Pos int -} - -func (m *MockIncrementalPathRecorder) Receive(ctx context.Context, r *replication.ReceiveReq, rs io.ReadCloser) (error) { - if m.Pos >= len(m.Sequence) { - m.T.Fatal("unexpected Receive") - } - i := m.Sequence[m.Pos] - m.Pos++ - if !assert.Equal(m.T, i.ReceiveRequest, r) { - m.T.FailNow() - } - return i.ReceiveError -} - -func (m *MockIncrementalPathRecorder) Send(ctx context.Context, r *replication.SendReq) (*replication.SendRes, io.ReadCloser, error) { - if m.Pos >= len(m.Sequence) { - m.T.Fatal("unexpected Send") - } - i := m.Sequence[m.Pos] - m.Pos++ - if !assert.Equal(m.T, i.SendRequest, r) { - m.T.FailNow() - } - return i.SendResponse, i.SendReader, i.SendError -} - -func (m *MockIncrementalPathRecorder) Finished() bool { - return m.Pos == len(m.Sequence) -} - -//type IncrementalPathReplicatorTest struct { -// Msg string -// Filesystem *replication.Filesystem -// Path []*replication.FilesystemVersion -// Steps []IncrementalPathSequenceStep -//} -// -//func (test *IncrementalPathReplicatorTest) Test(t *testing.T) { -// -// t.Log(test.Msg) -// -// rec := &MockIncrementalPathRecorder{ -// T: t, -// Sequence: test.Steps, -// } -// -// ctx := replication.ContextWithLogger(context.Background(), testLog{t}) -// -// ipr := replication.NewIncrementalPathReplicator() -// ipr.Replicate( -// ctx, -// rec, -// rec, -// DiscardCopier{}, -// test.Filesystem, -// test.Path, -// ) -// -// assert.True(t, rec.Finished()) -// -//} - -//type testLog struct { -// t *testing.T -//} -// -//var _ replication.Logger = testLog{} -// -//func (t testLog) Infof(fmt string, args ...interface{}) { -// t.t.Logf(fmt, args) -//} -//func (t testLog) Errorf(fmt string, args ...interface{}) { -// t.t.Logf(fmt, args) -//} - - -//func TestIncrementalPathReplicator_Replicate(t *testing.T) { -// -// tbl := []IncrementalPathReplicatorTest{ -// { -// Msg: "generic happy place with resume token", -// Filesystem: &replication.Filesystem{ -// Path: "foo/bar", -// ResumeToken: "blafoo", -// }, -// Path: fsvlist("@a,1", "@b,2", "@c,3"), -// Steps: []IncrementalPathSequenceStep{ -// { -// SendRequest: &replication.SendReq{ -// Filesystem: "foo/bar", -// From: "@a,1", -// To: "@b,2", -// ResumeToken: "blafoo", -// }, -// SendResponse: &replication.SendRes{ -// UsedResumeToken: true, -// }, -// }, -// { -// ReceiveRequest: &replication.ReceiveReq{ -// Filesystem: "foo/bar", -// ClearResumeToken: false, -// }, -// }, -// { -// SendRequest: &replication.SendReq{ -// Filesystem: "foo/bar", -// From: "@b,2", -// To: "@c,3", -// }, -// }, -// { -// ReceiveRequest: &replication.ReceiveReq{ -// Filesystem: "foo/bar", -// }, -// }, -// }, -// }, -// { -// Msg: "no action on empty sequence", -// Filesystem: &replication.Filesystem{ -// Path: "foo/bar", -// }, -// Path: fsvlist(), -// Steps: []IncrementalPathSequenceStep{}, -// }, -// { -// Msg: "full send on single entry path", -// Filesystem: &replication.Filesystem{ -// Path: "foo/bar", -// }, -// Path: fsvlist("@justone,1"), -// Steps: []IncrementalPathSequenceStep{ -// { -// SendRequest: &replication.SendReq{ -// Filesystem: "foo/bar", -// From: "@justone,1", -// To: "", // empty means full send -// }, -// SendResponse: &replication.SendRes{ -// UsedResumeToken: false, -// }, -// }, -// { -// ReceiveRequest: &replication.ReceiveReq{ -// Filesystem: "foo/bar", -// ClearResumeToken: false, -// }, -// }, -// }, -// }, -// } -// -// for _, test := range tbl { -// test.Test(t) -// } -// -//} diff --git a/cmd/replication.v2/report.go b/cmd/replication.v2/report.go deleted file mode 100644 index 6df5b68..0000000 --- a/cmd/replication.v2/report.go +++ /dev/null @@ -1,94 +0,0 @@ -package replication - -type Report struct { - Status string - Problem string - Completed []*FilesystemReplicationReport - Pending []*FilesystemReplicationReport - Active *FilesystemReplicationReport -} - -type StepReport struct { - From, To string - Status string - Problem string -} - -type FilesystemReplicationReport struct { - Filesystem string - Status string - Problem string - Steps []*StepReport -} - -func stepReportFromStep(step *FSReplicationStep) *StepReport { - var from string // FIXME follow same convention as ZFS: to should be nil on full send - if step.from != nil { - from = step.from.RelName() - } - rep := StepReport{ - From: from, - To: step.to.RelName(), - Status: step.state.String(), - } - return &rep -} - -// access to fsr's members must be exclusive -func filesystemReplicationReport(fsr *FSReplication) *FilesystemReplicationReport { - fsr.lock.Lock() - defer fsr.lock.Unlock() - - rep := FilesystemReplicationReport{ - Filesystem: fsr.fs.Path, - Status: fsr.state.String(), - } - - if fsr.state&FSPermanentError != 0 { - rep.Problem = fsr.err.Error() - return &rep - } - - rep.Steps = make([]*StepReport, 0, len(fsr.completed)+len(fsr.pending) + 1) - for _, step := range fsr.completed { - rep.Steps = append(rep.Steps, stepReportFromStep(step)) - } - if fsr.current != nil { - rep.Steps = append(rep.Steps, stepReportFromStep(fsr.current)) - } - for _, step := range fsr.pending { - rep.Steps = append(rep.Steps, stepReportFromStep(step)) - } - return &rep -} - -func (r *Replication) Report() *Report { - r.lock.Lock() - defer r.lock.Unlock() - - rep := Report{ - Status: r.state.String(), - } - - if r.state&(Planning|PlanningError|ContextDone) != 0 { - switch r.state { - case PlanningError: - rep.Problem = r.planningError.Error() - case ContextDone: - rep.Problem = r.contextError.Error() - } - return &rep - } - - rep.Pending = make([]*FilesystemReplicationReport, 0, r.queue.Len()) - rep.Completed = make([]*FilesystemReplicationReport, 0, len(r.completed)) // room for active (potentially) - - r.queue.Foreach(func (h *replicationQueueItemHandle){ - rep.Pending = append(rep.Pending, filesystemReplicationReport(h.GetFSReplication())) - }) - for _, fsr := range r.completed { - rep.Completed = append(rep.Completed, filesystemReplicationReport(fsr)) - } - - return &rep -} diff --git a/cmd/replication/common/common.go b/cmd/replication/common/common.go new file mode 100644 index 0000000..3166bfc --- /dev/null +++ b/cmd/replication/common/common.go @@ -0,0 +1,91 @@ +package common + +import ( + "context" + "io" + + "github.com/zrepl/zrepl/cmd/replication/pdu" + "github.com/zrepl/zrepl/logger" +) + +type contextKey int + +const ( + contextKeyLog contextKey = iota +) + +type Logger = logger.Logger + +func ContextWithLogger(ctx context.Context, l Logger) context.Context { + return context.WithValue(ctx, contextKeyLog, l) +} + +func GetLogger(ctx context.Context) Logger { + l, ok := ctx.Value(contextKeyLog).(Logger) + if !ok { + l = logger.NewNullLogger() + } + return l +} + +type ReplicationEndpoint interface { + // Does not include placeholder filesystems + ListFilesystems(ctx context.Context) ([]*pdu.Filesystem, error) + ListFilesystemVersions(ctx context.Context, fs string) ([]*pdu.FilesystemVersion, error) // fix depS + Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error) + Receive(ctx context.Context, r *pdu.ReceiveReq, sendStream io.ReadCloser) error +} + +type FilteredError struct{ fs string } + +func NewFilteredError(fs string) *FilteredError { + return &FilteredError{fs} +} + +func (f FilteredError) Error() string { return "endpoint does not allow access to filesystem " + f.fs } + +type ReplicationMode int + +const ( + ReplicationModePull ReplicationMode = iota + ReplicationModePush +) + +type EndpointPair struct { + a, b ReplicationEndpoint + m ReplicationMode +} + +func NewEndpointPairPull(sender, receiver ReplicationEndpoint) EndpointPair { + return EndpointPair{sender, receiver, ReplicationModePull} +} + +func NewEndpointPairPush(sender, receiver ReplicationEndpoint) EndpointPair { + return EndpointPair{receiver, sender, ReplicationModePush} +} + +func (p EndpointPair) Sender() ReplicationEndpoint { + switch p.m { + case ReplicationModePull: + return p.a + case ReplicationModePush: + return p.b + } + panic("should not be reached") + return nil +} + +func (p EndpointPair) Receiver() ReplicationEndpoint { + switch p.m { + case ReplicationModePull: + return p.b + case ReplicationModePush: + return p.a + } + panic("should not be reached") + return nil +} + +func (p EndpointPair) Mode() ReplicationMode { + return p.m +} diff --git a/cmd/replication/internal/fsfsm/fsfsm.go b/cmd/replication/internal/fsfsm/fsfsm.go new file mode 100644 index 0000000..d5c3629 --- /dev/null +++ b/cmd/replication/internal/fsfsm/fsfsm.go @@ -0,0 +1,361 @@ +package fsfsm + +import ( + "context" + "errors" + "fmt" + "io" + "math/bits" + "net" + "sync" + "time" + + "github.com/zrepl/zrepl/cmd/replication/pdu" + . "github.com/zrepl/zrepl/cmd/replication/common" +) + +type StepReport struct { + From, To string + Status string + Problem string +} + +type FilesystemReplicationReport struct { + Filesystem string + Status string + Problem string + Completed,Pending []*StepReport +} + + +//go:generate stringer -type=FSReplicationState +type FSReplicationState uint + +const ( + FSReady FSReplicationState = 1 << iota + FSRetryWait + FSPermanentError + FSCompleted +) + +func (s FSReplicationState) fsrsf() fsrsf { + idx := bits.TrailingZeros(uint(s)) + if idx == bits.UintSize { + panic(s) + } + m := []fsrsf{ + fsrsfReady, + fsrsfRetryWait, + nil, + nil, + } + return m[idx] +} + +type FSReplication struct { + // lock protects all fields in this struct, but not the data behind pointers + lock sync.Mutex + state FSReplicationState + fs string + err error + retryWaitUntil time.Time + completed, pending []*FSReplicationStep +} + +func (f *FSReplication) State() FSReplicationState { + f.lock.Lock() + defer f.lock.Unlock() + return f.state +} + +type FSReplicationBuilder struct { + r *FSReplication +} + +func BuildFSReplication(fs string) *FSReplicationBuilder { + return &FSReplicationBuilder{&FSReplication{fs: fs}} +} + +func (b *FSReplicationBuilder) AddStep(from, to FilesystemVersion) *FSReplicationBuilder { + step := &FSReplicationStep{ + state: StepReady, + fsrep: b.r, + from: from, + to: to, + } + b.r.pending = append(b.r.pending, step) + return b +} + +func (b *FSReplicationBuilder) Done() (r *FSReplication) { + if len(b.r.pending) > 0 { + b.r.state = FSReady + } else { + b.r.state = FSCompleted + } + r = b.r + b.r = nil + return r +} + +func NewFSReplicationWithPermanentError(fs string, err error) *FSReplication { + return &FSReplication{ + state: FSPermanentError, + fs: fs, + err: err, + } +} + + +//go:generate stringer -type=FSReplicationStepState +type FSReplicationStepState uint + +const ( + StepReady FSReplicationStepState = 1 << iota + StepRetry + StepPermanentError + StepCompleted +) + +type FilesystemVersion interface { + SnapshotTime() time.Time + RelName() string +} + +type FSReplicationStep struct { + // only protects state, err + // from, to and fsrep are assumed to be immutable + lock sync.Mutex + + state FSReplicationStepState + from, to FilesystemVersion + fsrep *FSReplication + + // both retry and permanent error + err error +} + +func (f *FSReplication) TakeStep(ctx context.Context, ep EndpointPair) (post FSReplicationState, nextStepDate time.Time) { + + var u fsrUpdater = func(fu func(*FSReplication)) FSReplicationState { + f.lock.Lock() + defer f.lock.Unlock() + if fu != nil { + fu(f) + } + return f.state + } + var s fsrsf = u(nil).fsrsf() + + pre := u(nil) + preTime := time.Now() + s = s(ctx, ep, u) + delta := time.Now().Sub(preTime) + post = u(func(f *FSReplication) { + if len(f.pending) == 0 { + return + } + nextStepDate = f.pending[0].to.SnapshotTime() + }) + + GetLogger(ctx). + WithField("fs", f.fs). + WithField("transition", fmt.Sprintf("%s => %s", pre, post)). + WithField("duration", delta). + Debug("fsr step taken") + + return post, nextStepDate +} + +type fsrUpdater func(func(fsr *FSReplication)) FSReplicationState + +type fsrsf func(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf + +func fsrsfReady(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf { + + var current *FSReplicationStep + s := u(func(f *FSReplication) { + if len(f.pending) == 0 { + f.state = FSCompleted + return + } + current = f.pending[0] + }) + if s != FSReady { + return s.fsrsf() + } + + stepState := current.do(ctx, ep) + + return u(func(f *FSReplication) { + switch stepState { + case StepCompleted: + f.completed = append(f.completed, current) + f.pending = f.pending[1:] + if len(f.pending) > 0 { + f.state = FSReady + } else { + f.state = FSCompleted + } + case StepRetry: + f.retryWaitUntil = time.Now().Add(10 * time.Second) // FIXME make configurable + f.state = FSRetryWait + case StepPermanentError: + f.state = FSPermanentError + f.err = errors.New("a replication step failed with a permanent error") + default: + panic(f) + } + }).fsrsf() +} + +func fsrsfRetryWait(ctx context.Context, ep EndpointPair, u fsrUpdater) fsrsf { + var sleepUntil time.Time + u(func(f *FSReplication) { + sleepUntil = f.retryWaitUntil + }) + t := time.NewTimer(sleepUntil.Sub(time.Now())) + defer t.Stop() + select { + case <-ctx.Done(): + return u(func(f *FSReplication) { + f.state = FSPermanentError + f.err = ctx.Err() + }).fsrsf() + case <-t.C: + } + return u(func(f *FSReplication) { + f.state = FSReady + }).fsrsf() +} + +// access to fsr's members must be exclusive +func (fsr *FSReplication) Report() *FilesystemReplicationReport { + fsr.lock.Lock() + defer fsr.lock.Unlock() + + rep := FilesystemReplicationReport{ + Filesystem: fsr.fs, + Status: fsr.state.String(), + } + + if fsr.state&FSPermanentError != 0 { + rep.Problem = fsr.err.Error() + return &rep + } + + rep.Completed = make([]*StepReport, len(fsr.completed)) + for i := range fsr.completed { + rep.Completed[i] = fsr.completed[i].Report() + } + rep.Pending = make([]*StepReport, len(fsr.pending)) + for i := range fsr.pending { + rep.Pending[i] = fsr.pending[i].Report() + } + return &rep +} + +func (s *FSReplicationStep) do(ctx context.Context, ep EndpointPair) FSReplicationStepState { + + fs := s.fsrep.fs + + log := GetLogger(ctx). + WithField("filesystem", fs). + WithField("step", s.String()) + + updateStateError := func(err error) FSReplicationStepState { + s.lock.Lock() + defer s.lock.Unlock() + + s.err = err + switch err { + case io.EOF: + fallthrough + case io.ErrUnexpectedEOF: + fallthrough + case io.ErrClosedPipe: + s.state = StepRetry + return s.state + } + if _, ok := err.(net.Error); ok { + s.state = StepRetry + return s.state + } + s.state = StepPermanentError + return s.state + } + + updateStateCompleted := func() FSReplicationStepState { + s.lock.Lock() + defer s.lock.Unlock() + s.err = nil + s.state = StepCompleted + return s.state + } + + var sr *pdu.SendReq + if s.from == nil { + sr = &pdu.SendReq{ + Filesystem: fs, + From: s.to.RelName(), // FIXME fix protocol to use To, like zfs does internally + } + } else { + sr = &pdu.SendReq{ + Filesystem: fs, + From: s.from.RelName(), + To: s.to.RelName(), + } + } + + log.WithField("request", sr).Debug("initiate send request") + sres, sstream, err := ep.Sender().Send(ctx, sr) + if err != nil { + log.WithError(err).Error("send request failed") + return updateStateError(err) + } + if sstream == nil { + err := errors.New("send request did not return a stream, broken endpoint implementation") + return updateStateError(err) + } + + rr := &pdu.ReceiveReq{ + Filesystem: fs, + ClearResumeToken: !sres.UsedResumeToken, + } + log.WithField("request", rr).Debug("initiate receive request") + err = ep.Receiver().Receive(ctx, rr, sstream) + if err != nil { + log.WithError(err).Error("receive request failed (might also be error on sender)") + sstream.Close() + // This failure could be due to + // - an unexpected exit of ZFS on the sending side + // - an unexpected exit of ZFS on the receiving side + // - a connectivity issue + return updateStateError(err) + } + log.Info("receive finished") + return updateStateCompleted() + +} + +func (s *FSReplicationStep) String() string { + if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send + return fmt.Sprintf("%s%s (full)", s.fsrep.fs, s.to.RelName()) + } else { + return fmt.Sprintf("%s(%s => %s)", s.fsrep.fs, s.from, s.to.RelName()) + } +} + +func (step *FSReplicationStep) Report() *StepReport { + var from string // FIXME follow same convention as ZFS: to should be nil on full send + if step.from != nil { + from = step.from.RelName() + } + rep := StepReport{ + From: from, + To: step.to.RelName(), + Status: step.state.String(), + } + return &rep +} + diff --git a/cmd/replication.v2/fsreplicationstate_string.go b/cmd/replication/internal/fsfsm/fsreplicationstate_string.go similarity index 97% rename from cmd/replication.v2/fsreplicationstate_string.go rename to cmd/replication/internal/fsfsm/fsreplicationstate_string.go index 6002737..225fee9 100644 --- a/cmd/replication.v2/fsreplicationstate_string.go +++ b/cmd/replication/internal/fsfsm/fsreplicationstate_string.go @@ -1,6 +1,6 @@ // Code generated by "stringer -type=FSReplicationState"; DO NOT EDIT. -package replication +package fsfsm import "strconv" diff --git a/cmd/replication.v2/fsreplicationstepstate_string.go b/cmd/replication/internal/fsfsm/fsreplicationstepstate_string.go similarity index 97% rename from cmd/replication.v2/fsreplicationstepstate_string.go rename to cmd/replication/internal/fsfsm/fsreplicationstepstate_string.go index 3d075c1..3d64be0 100644 --- a/cmd/replication.v2/fsreplicationstepstate_string.go +++ b/cmd/replication/internal/fsfsm/fsreplicationstepstate_string.go @@ -1,6 +1,6 @@ // Code generated by "stringer -type=FSReplicationStepState"; DO NOT EDIT. -package replication +package fsfsm import "strconv" diff --git a/cmd/replication.v2/diff.go b/cmd/replication/internal/mainfsm/diff.go similarity index 98% rename from cmd/replication.v2/diff.go rename to cmd/replication/internal/mainfsm/diff.go index bb6cb17..f1f4af6 100644 --- a/cmd/replication.v2/diff.go +++ b/cmd/replication/internal/mainfsm/diff.go @@ -1,7 +1,9 @@ -package replication +package mainfsm import ( "sort" + + . "github.com/zrepl/zrepl/cmd/replication/pdu" ) type ConflictNoCommonAncestor struct { diff --git a/cmd/replication/internal/mainfsm/mainfsm.go b/cmd/replication/internal/mainfsm/mainfsm.go new file mode 100644 index 0000000..883d6d5 --- /dev/null +++ b/cmd/replication/internal/mainfsm/mainfsm.go @@ -0,0 +1,342 @@ +package mainfsm + +import ( + "context" + "errors" + "fmt" + "math/bits" + "sync" + "time" + + . "github.com/zrepl/zrepl/cmd/replication/common" + "github.com/zrepl/zrepl/cmd/replication/pdu" + "github.com/zrepl/zrepl/cmd/replication/internal/fsfsm" + . "github.com/zrepl/zrepl/cmd/replication/internal/mainfsm/queue" +) + +//go:generate stringer -type=ReplicationState +type ReplicationState uint + +const ( + Planning ReplicationState = 1 << iota + PlanningError + Working + WorkingWait + Completed + ContextDone +) + +func (s ReplicationState) rsf() replicationStateFunc { + idx := bits.TrailingZeros(uint(s)) + if idx == bits.UintSize { + panic(s) // invalid value + } + m := []replicationStateFunc{ + rsfPlanning, + rsfPlanningError, + rsfWorking, + rsfWorkingWait, + nil, + nil, + } + return m[idx] +} + +type Replication struct { + // lock protects all fields of this struct (but not the fields behind pointers!) + lock sync.Mutex + + state ReplicationState + + // Working, WorkingWait, Completed, ContextDone + queue *ReplicationQueue + completed []*fsfsm.FSReplication + active *ReplicationQueueItemHandle + + // PlanningError + planningError error + + // ContextDone + contextError error + + // PlanningError, WorkingWait + sleepUntil time.Time +} + +type Report struct { + Status string + Problem string + Completed []*fsfsm.FilesystemReplicationReport + Pending []*fsfsm.FilesystemReplicationReport + Active *fsfsm.FilesystemReplicationReport +} + + +func NewReplication() *Replication { + r := Replication{ + state: Planning, + } + return &r +} + +type replicationUpdater func(func(*Replication)) (newState ReplicationState) +type replicationStateFunc func(context.Context, EndpointPair, replicationUpdater) replicationStateFunc + +func (r *Replication) Drive(ctx context.Context, ep EndpointPair) { + + var u replicationUpdater = func(f func(*Replication)) ReplicationState { + r.lock.Lock() + defer r.lock.Unlock() + if f != nil { + f(r) + } + return r.state + } + + var s replicationStateFunc = rsfPlanning + var pre, post ReplicationState + for s != nil { + preTime := time.Now() + pre = u(nil) + s = s(ctx, ep, u) + delta := time.Now().Sub(preTime) + post = u(nil) + GetLogger(ctx). + WithField("transition", fmt.Sprintf("%s => %s", pre, post)). + WithField("duration", delta). + Debug("main state transition") + } + + GetLogger(ctx). + WithField("final_state", post). + Debug("main final state") +} + +func resolveConflict(conflict error) (path []*pdu.FilesystemVersion, msg string) { + if noCommonAncestor, ok := conflict.(*ConflictNoCommonAncestor); ok { + if len(noCommonAncestor.SortedReceiverVersions) == 0 { + // FIXME hard-coded replication policy: most recent + // snapshot as source + var mostRecentSnap *pdu.FilesystemVersion + for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- { + if noCommonAncestor.SortedSenderVersions[n].Type == pdu.FilesystemVersion_Snapshot { + mostRecentSnap = noCommonAncestor.SortedSenderVersions[n] + break + } + } + if mostRecentSnap == nil { + return nil, "no snapshots available on sender side" + } + return []*pdu.FilesystemVersion{mostRecentSnap}, fmt.Sprintf("start replication at most recent snapshot %s", mostRecentSnap.RelName()) + } + } + return nil, "no automated way to handle conflict type" +} + +func rsfPlanning(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { + + log := GetLogger(ctx) + + handlePlanningError := func(err error) replicationStateFunc { + return u(func(r *Replication) { + r.planningError = err + r.state = PlanningError + }).rsf() + } + + sfss, err := ep.Sender().ListFilesystems(ctx) + if err != nil { + log.WithError(err).Error("error listing sender filesystems") + return handlePlanningError(err) + } + + rfss, err := ep.Receiver().ListFilesystems(ctx) + if err != nil { + log.WithError(err).Error("error listing receiver filesystems") + return handlePlanningError(err) + } + + q := NewReplicationQueue() + mainlog := log + for _, fs := range sfss { + + log := mainlog.WithField("filesystem", fs.Path) + + log.Info("assessing filesystem") + + sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path) + if err != nil { + log.WithError(err).Error("cannot get remote filesystem versions") + return handlePlanningError(err) + } + + if len(sfsvs) <= 1 { + err := errors.New("sender does not have any versions") + log.Error(err.Error()) + q.Add(fsfsm.NewFSReplicationWithPermanentError(fs.Path, err)) + continue + } + + receiverFSExists := false + for _, rfs := range rfss { + if rfs.Path == fs.Path { + receiverFSExists = true + } + } + + var rfsvs []*pdu.FilesystemVersion + if receiverFSExists { + rfsvs, err = ep.Receiver().ListFilesystemVersions(ctx, fs.Path) + if err != nil { + if _, ok := err.(*FilteredError); ok { + log.Info("receiver ignores filesystem") + continue + } + log.WithError(err).Error("receiver error") + return handlePlanningError(err) + } + } else { + rfsvs = []*pdu.FilesystemVersion{} + } + + path, conflict := IncrementalPath(rfsvs, sfsvs) + if conflict != nil { + var msg string + path, msg = resolveConflict(conflict) // no shadowing allowed! + if path != nil { + log.WithField("conflict", conflict).Info("conflict") + log.WithField("resolution", msg).Info("automatically resolved") + } else { + log.WithField("conflict", conflict).Error("conflict") + log.WithField("problem", msg).Error("cannot resolve conflict") + } + } + if path == nil { + q.Add(fsfsm.NewFSReplicationWithPermanentError(fs.Path, conflict)) + continue + } + + fsrfsm := fsfsm.BuildFSReplication(fs.Path) + if len(path) == 1 { + fsrfsm.AddStep(nil, path[0]) + } else { + for i := 0; i < len(path)-1; i++ { + fsrfsm.AddStep(path[i], path[i+1]) + } + } + qitem := fsrfsm.Done() + q.Add(qitem) + } + + return u(func(r *Replication) { + r.completed = nil + r.queue = q + r.planningError = nil + r.state = Working + }).rsf() +} + +func rsfPlanningError(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { + sleepTime := 10 * time.Second + u(func(r *Replication) { + r.sleepUntil = time.Now().Add(sleepTime) + }) + t := time.NewTimer(sleepTime) // FIXME make constant onfigurable + defer t.Stop() + select { + case <-ctx.Done(): + return u(func(r *Replication) { + r.state = ContextDone + r.contextError = ctx.Err() + }).rsf() + case <-t.C: + return u(func(r *Replication) { + r.state = Planning + }).rsf() + } +} + +func rsfWorking(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { + + var active *ReplicationQueueItemHandle + rsfNext := u(func(r *Replication) { + done, next := r.queue.GetNext() + r.completed = append(r.completed, done...) + if next == nil { + r.state = Completed + } + r.active = next + active = next + }).rsf() + + if active == nil { + return rsfNext + } + + state, nextStepDate := active.GetFSReplication().TakeStep(ctx, ep) + + return u(func(r *Replication) { + active.Update(state, nextStepDate) + r.active = nil + }).rsf() +} + +func rsfWorkingWait(ctx context.Context, ep EndpointPair, u replicationUpdater) replicationStateFunc { + sleepTime := 10 * time.Second + u(func(r *Replication) { + r.sleepUntil = time.Now().Add(sleepTime) + }) + t := time.NewTimer(sleepTime) + defer t.Stop() + select { + case <-ctx.Done(): + return u(func(r *Replication) { + r.state = ContextDone + r.contextError = ctx.Err() + }).rsf() + case <-t.C: + return u(func(r *Replication) { + r.state = Working + }).rsf() + } +} + +func (r *Replication) Report() *Report { + r.lock.Lock() + defer r.lock.Unlock() + + rep := Report{ + Status: r.state.String(), + } + + if r.state&(Planning|PlanningError|ContextDone) != 0 { + switch r.state { + case PlanningError: + rep.Problem = r.planningError.Error() + case ContextDone: + rep.Problem = r.contextError.Error() + } + return &rep + } + + rep.Pending = make([]*fsfsm.FilesystemReplicationReport, 0, r.queue.Len()) + rep.Completed = make([]*fsfsm.FilesystemReplicationReport, 0, len(r.completed)) // room for active (potentially) + + var active *fsfsm.FSReplication + if r.active != nil { + active = r.active.GetFSReplication() + rep.Active = active.Report() + } + r.queue.Foreach(func (h *ReplicationQueueItemHandle){ + fsr := h.GetFSReplication() + if active != fsr { + rep.Pending = append(rep.Pending, fsr.Report()) + } + }) + for _, fsr := range r.completed { + rep.Completed = append(rep.Completed, fsr.Report()) + } + + return &rep +} + diff --git a/cmd/replication/internal/mainfsm/queue/queue.go b/cmd/replication/internal/mainfsm/queue/queue.go new file mode 100644 index 0000000..e2b588d --- /dev/null +++ b/cmd/replication/internal/mainfsm/queue/queue.go @@ -0,0 +1,124 @@ +package queue + +import ( + "time" + "sort" + + . "github.com/zrepl/zrepl/cmd/replication/internal/fsfsm" +) + +type replicationQueueItem struct { + retriesSinceLastError int + // duplicates fsr.state to avoid accessing and locking fsr + state FSReplicationState + // duplicates fsr.current.nextStepDate to avoid accessing & locking fsr + nextStepDate time.Time + + fsr *FSReplication +} + +type ReplicationQueue []*replicationQueueItem + +func NewReplicationQueue() *ReplicationQueue { + q := make(ReplicationQueue, 0) + return &q +} + +func (q ReplicationQueue) Len() int { return len(q) } +func (q ReplicationQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } + +type lessmapEntry struct{ + prio int + less func(a,b *replicationQueueItem) bool +} + +var lessmap = map[FSReplicationState]lessmapEntry { + FSReady: { + prio: 0, + less: func(a, b *replicationQueueItem) bool { + return a.nextStepDate.Before(b.nextStepDate) + }, + }, + FSRetryWait: { + prio: 1, + less: func(a, b *replicationQueueItem) bool { + return a.retriesSinceLastError < b.retriesSinceLastError + }, + }, +} + +func (q ReplicationQueue) Less(i, j int) bool { + + a, b := q[i], q[j] + al, aok := lessmap[a.state] + if !aok { + panic(a) + } + bl, bok := lessmap[b.state] + if !bok { + panic(b) + } + + if al.prio != bl.prio { + return al.prio < bl.prio + } + + return al.less(a, b) +} + +func (q *ReplicationQueue) sort() (done []*FSReplication) { + // pre-scan for everything that is not ready + newq := make(ReplicationQueue, 0, len(*q)) + done = make([]*FSReplication, 0, len(*q)) + for _, qitem := range *q { + if _, ok := lessmap[qitem.state]; !ok { + done = append(done, qitem.fsr) + } else { + newq = append(newq, qitem) + } + } + sort.Stable(newq) // stable to avoid flickering in reports + *q = newq + return done +} + +// next remains valid until the next call to GetNext() +func (q *ReplicationQueue) GetNext() (done []*FSReplication, next *ReplicationQueueItemHandle) { + done = q.sort() + if len(*q) == 0 { + return done, nil + } + next = &ReplicationQueueItemHandle{(*q)[0]} + return done, next +} + +func (q *ReplicationQueue) Add(fsr *FSReplication) { + *q = append(*q, &replicationQueueItem{ + fsr: fsr, + state: fsr.State(), + }) +} + +func (q *ReplicationQueue) Foreach(fu func(*ReplicationQueueItemHandle)) { + for _, qitem := range *q { + fu(&ReplicationQueueItemHandle{qitem}) + } +} + +type ReplicationQueueItemHandle struct { + i *replicationQueueItem +} + +func (h ReplicationQueueItemHandle) GetFSReplication() *FSReplication { + return h.i.fsr +} + +func (h ReplicationQueueItemHandle) Update(newState FSReplicationState, nextStepDate time.Time) { + h.i.state = newState + h.i.nextStepDate = nextStepDate + if h.i.state&FSReady != 0 { + h.i.retriesSinceLastError = 0 + } else if h.i.state&FSRetryWait != 0 { + h.i.retriesSinceLastError++ + } +} diff --git a/cmd/replication.v2/replicationstate_string.go b/cmd/replication/internal/mainfsm/replicationstate_string.go similarity index 97% rename from cmd/replication.v2/replicationstate_string.go rename to cmd/replication/internal/mainfsm/replicationstate_string.go index e4381ba..46511af 100644 --- a/cmd/replication.v2/replicationstate_string.go +++ b/cmd/replication/internal/mainfsm/replicationstate_string.go @@ -1,6 +1,6 @@ // Code generated by "stringer -type=ReplicationState"; DO NOT EDIT. -package replication +package mainfsm import "strconv" diff --git a/cmd/replication.v2/pdu.pb.go b/cmd/replication/pdu/pdu.pb.go similarity index 72% rename from cmd/replication.v2/pdu.pb.go rename to cmd/replication/pdu/pdu.pb.go index 1d382bd..d9bed35 100644 --- a/cmd/replication.v2/pdu.pb.go +++ b/cmd/replication/pdu/pdu.pb.go @@ -2,7 +2,7 @@ // source: pdu.proto /* -Package replication is a generated protocol buffer package. +Package pdu is a generated protocol buffer package. It is generated from these files: pdu.proto @@ -20,7 +20,7 @@ It has these top-level messages: ReceiveReq ReceiveRes */ -package replication +package pdu import proto "github.com/golang/protobuf/proto" import fmt "fmt" @@ -141,7 +141,7 @@ func (m *ListFilesystemVersionsRes) GetVersions() []*FilesystemVersion { } type FilesystemVersion struct { - Type FilesystemVersion_VersionType `protobuf:"varint,1,opt,name=Type,enum=replication.FilesystemVersion_VersionType" json:"Type,omitempty"` + Type FilesystemVersion_VersionType `protobuf:"varint,1,opt,name=Type,enum=pdu.FilesystemVersion_VersionType" json:"Type,omitempty"` Name string `protobuf:"bytes,2,opt,name=Name" json:"Name,omitempty"` Guid uint64 `protobuf:"varint,3,opt,name=Guid" json:"Guid,omitempty"` CreateTXG uint64 `protobuf:"varint,4,opt,name=CreateTXG" json:"CreateTXG,omitempty"` @@ -191,7 +191,8 @@ func (m *FilesystemVersion) GetCreation() string { type SendReq struct { Filesystem string `protobuf:"bytes,1,opt,name=Filesystem" json:"Filesystem,omitempty"` From string `protobuf:"bytes,2,opt,name=From" json:"From,omitempty"` - To string `protobuf:"bytes,3,opt,name=To" json:"To,omitempty"` + // May be empty / null to request a full transfer of From + To string `protobuf:"bytes,3,opt,name=To" json:"To,omitempty"` // If ResumeToken is not empty, the resume token that CAN be tried for 'zfs send' by the sender. // The sender MUST indicate in SendRes.UsedResumeToken // If it does not work, the sender SHOULD clear the resume token on their side @@ -334,51 +335,50 @@ func (*ReceiveRes) ProtoMessage() {} func (*ReceiveRes) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } func init() { - proto.RegisterType((*ListFilesystemReq)(nil), "replication.ListFilesystemReq") - proto.RegisterType((*ListFilesystemRes)(nil), "replication.ListFilesystemRes") - proto.RegisterType((*Filesystem)(nil), "replication.Filesystem") - proto.RegisterType((*ListFilesystemVersionsReq)(nil), "replication.ListFilesystemVersionsReq") - proto.RegisterType((*ListFilesystemVersionsRes)(nil), "replication.ListFilesystemVersionsRes") - proto.RegisterType((*FilesystemVersion)(nil), "replication.FilesystemVersion") - proto.RegisterType((*SendReq)(nil), "replication.SendReq") - proto.RegisterType((*Property)(nil), "replication.Property") - proto.RegisterType((*SendRes)(nil), "replication.SendRes") - proto.RegisterType((*ReceiveReq)(nil), "replication.ReceiveReq") - proto.RegisterType((*ReceiveRes)(nil), "replication.ReceiveRes") - proto.RegisterEnum("replication.FilesystemVersion_VersionType", FilesystemVersion_VersionType_name, FilesystemVersion_VersionType_value) + proto.RegisterType((*ListFilesystemReq)(nil), "pdu.ListFilesystemReq") + proto.RegisterType((*ListFilesystemRes)(nil), "pdu.ListFilesystemRes") + proto.RegisterType((*Filesystem)(nil), "pdu.Filesystem") + proto.RegisterType((*ListFilesystemVersionsReq)(nil), "pdu.ListFilesystemVersionsReq") + proto.RegisterType((*ListFilesystemVersionsRes)(nil), "pdu.ListFilesystemVersionsRes") + proto.RegisterType((*FilesystemVersion)(nil), "pdu.FilesystemVersion") + proto.RegisterType((*SendReq)(nil), "pdu.SendReq") + proto.RegisterType((*Property)(nil), "pdu.Property") + proto.RegisterType((*SendRes)(nil), "pdu.SendRes") + proto.RegisterType((*ReceiveReq)(nil), "pdu.ReceiveReq") + proto.RegisterType((*ReceiveRes)(nil), "pdu.ReceiveRes") + proto.RegisterEnum("pdu.FilesystemVersion_VersionType", FilesystemVersion_VersionType_name, FilesystemVersion_VersionType_value) } func init() { proto.RegisterFile("pdu.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 454 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0x4d, 0x6f, 0xd3, 0x40, - 0x10, 0x65, 0x53, 0xa7, 0x38, 0xe3, 0xaa, 0xa4, 0x4b, 0x11, 0x06, 0xa1, 0x2a, 0xda, 0x53, 0xe8, - 0x21, 0x87, 0x02, 0x07, 0x40, 0xe2, 0xd0, 0xa2, 0xf6, 0x82, 0xaa, 0x6a, 0x6b, 0x4a, 0xaf, 0xa6, - 0x1e, 0xa9, 0x4b, 0x62, 0xaf, 0xbb, 0x63, 0x23, 0xe5, 0xe7, 0xf0, 0xcf, 0xf8, 0x29, 0xc8, 0x53, - 0x3b, 0xd9, 0x26, 0x2a, 0xca, 0xc9, 0xf3, 0xde, 0x7c, 0xbd, 0x7d, 0xeb, 0x85, 0x41, 0x99, 0xd5, - 0x93, 0xd2, 0xd9, 0xca, 0xca, 0xc8, 0x61, 0x39, 0x33, 0x37, 0x69, 0x65, 0x6c, 0xa1, 0x9e, 0xc3, - 0xde, 0x37, 0x43, 0xd5, 0xa9, 0x99, 0x21, 0xcd, 0xa9, 0xc2, 0x5c, 0xe3, 0x9d, 0x3a, 0x5f, 0x27, - 0x49, 0x7e, 0x84, 0x68, 0x49, 0x50, 0x2c, 0x46, 0x5b, 0xe3, 0xe8, 0xe8, 0xe5, 0xc4, 0x1b, 0x36, - 0xf1, 0x1a, 0xfc, 0x5a, 0x75, 0x0c, 0xb0, 0x84, 0x52, 0x42, 0x70, 0x91, 0x56, 0xb7, 0xb1, 0x18, - 0x89, 0xf1, 0x40, 0x73, 0x2c, 0x47, 0x10, 0x69, 0xa4, 0x3a, 0xc7, 0xc4, 0x4e, 0xb1, 0x88, 0x7b, - 0x9c, 0xf2, 0x29, 0xf5, 0x19, 0x5e, 0x3d, 0xd4, 0x74, 0x85, 0x8e, 0x8c, 0x2d, 0x48, 0xe3, 0x9d, - 0x3c, 0xf0, 0x17, 0xb4, 0x83, 0x3d, 0x46, 0xfd, 0x78, 0xbc, 0x99, 0xe4, 0x27, 0x08, 0x3b, 0xd8, - 0x9e, 0xea, 0xe0, 0x91, 0x53, 0xb5, 0x65, 0x7a, 0x51, 0xaf, 0xfe, 0x0a, 0xd8, 0x5b, 0xcb, 0xcb, - 0x2f, 0x10, 0x24, 0xf3, 0x12, 0x59, 0xc8, 0xee, 0xd1, 0xe1, 0xff, 0xa7, 0x4d, 0xda, 0x6f, 0xd3, - 0xa1, 0xb9, 0xaf, 0x71, 0xe8, 0x3c, 0xcd, 0xb1, 0xb5, 0x81, 0xe3, 0x86, 0x3b, 0xab, 0x4d, 0x16, - 0x6f, 0x8d, 0xc4, 0x38, 0xd0, 0x1c, 0xcb, 0x37, 0x30, 0x38, 0x71, 0x98, 0x56, 0x98, 0x5c, 0x9f, - 0xc5, 0x01, 0x27, 0x96, 0x84, 0x7c, 0x0d, 0x21, 0x03, 0x63, 0x8b, 0xb8, 0xcf, 0x93, 0x16, 0x58, - 0xbd, 0x85, 0xc8, 0x5b, 0x2b, 0x77, 0x20, 0xbc, 0x2c, 0xd2, 0x92, 0x6e, 0x6d, 0x35, 0x7c, 0xd2, - 0xa0, 0x63, 0x6b, 0xa7, 0x79, 0xea, 0xa6, 0x43, 0xa1, 0xfe, 0x08, 0x78, 0x7a, 0x89, 0x45, 0xb6, - 0x81, 0xcf, 0x8d, 0xc8, 0x53, 0x67, 0xf3, 0x4e, 0x78, 0x13, 0xcb, 0x5d, 0xe8, 0x25, 0x96, 0x65, - 0x0f, 0x74, 0x2f, 0xb1, 0xab, 0x57, 0x1d, 0xac, 0x5d, 0x35, 0x0b, 0xb7, 0x79, 0xe9, 0x90, 0x88, - 0x85, 0x87, 0x7a, 0x81, 0xe5, 0x3e, 0xf4, 0xbf, 0x62, 0x56, 0x97, 0xf1, 0x36, 0x27, 0xee, 0x81, - 0x7a, 0x0f, 0xe1, 0x85, 0xb3, 0x25, 0xba, 0x6a, 0xbe, 0x30, 0x4f, 0x78, 0xe6, 0xed, 0x43, 0xff, - 0x2a, 0x9d, 0xd5, 0x9d, 0xa3, 0xf7, 0x40, 0xfd, 0xea, 0x0e, 0x46, 0x72, 0x0c, 0xcf, 0xbe, 0x13, - 0x66, 0xbe, 0x30, 0xc1, 0x0b, 0x56, 0x69, 0xf9, 0x01, 0xa0, 0x5d, 0x65, 0x90, 0xe2, 0x1e, 0xff, - 0x2f, 0x2f, 0x1e, 0xdc, 0x70, 0xa7, 0x44, 0x7b, 0x85, 0xea, 0x1a, 0x40, 0xe3, 0x0d, 0x9a, 0xdf, - 0xb8, 0x89, 0x8f, 0x87, 0x30, 0x3c, 0x99, 0x61, 0xea, 0x56, 0xdf, 0x44, 0xa8, 0xd7, 0x78, 0xb5, - 0xe3, 0x4d, 0xa6, 0x9f, 0xdb, 0xfc, 0xc6, 0xdf, 0xfd, 0x0b, 0x00, 0x00, 0xff, 0xff, 0xa4, 0x5a, - 0xf6, 0xa7, 0xf0, 0x03, 0x00, 0x00, + // 445 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xc1, 0x6e, 0xd3, 0x40, + 0x10, 0x65, 0x13, 0xa7, 0x38, 0x93, 0xd2, 0xa6, 0x4b, 0x85, 0x0c, 0x42, 0x28, 0xda, 0x53, 0x40, + 0x22, 0x12, 0x01, 0x71, 0xe1, 0xd6, 0xa2, 0xf4, 0x82, 0xa0, 0xda, 0x9a, 0xaa, 0x57, 0x17, 0x8f, + 0x54, 0x2b, 0xb1, 0x77, 0xbb, 0x63, 0x23, 0xe5, 0x73, 0xf8, 0x2b, 0x3e, 0x07, 0x79, 0x6a, 0x27, + 0x4b, 0x0c, 0x52, 0x4e, 0x99, 0xf7, 0x66, 0x32, 0xf3, 0xe6, 0xcd, 0x1a, 0x86, 0x36, 0xad, 0x66, + 0xd6, 0x99, 0xd2, 0xc8, 0xbe, 0x4d, 0x2b, 0xf5, 0x14, 0x4e, 0xbe, 0x64, 0x54, 0x2e, 0xb2, 0x15, + 0xd2, 0x9a, 0x4a, 0xcc, 0x35, 0xde, 0xab, 0x45, 0x97, 0x24, 0xf9, 0x0e, 0x46, 0x5b, 0x82, 0x22, + 0x31, 0xe9, 0x4f, 0x47, 0xf3, 0xe3, 0x59, 0xdd, 0xcf, 0x2b, 0xf4, 0x6b, 0xd4, 0x19, 0xc0, 0x16, + 0x4a, 0x09, 0xc1, 0x65, 0x52, 0xde, 0x45, 0x62, 0x22, 0xa6, 0x43, 0xcd, 0xb1, 0x9c, 0xc0, 0x48, + 0x23, 0x55, 0x39, 0xc6, 0x66, 0x89, 0x45, 0xd4, 0xe3, 0x94, 0x4f, 0xa9, 0x4f, 0xf0, 0xfc, 0x6f, + 0x2d, 0xd7, 0xe8, 0x28, 0x33, 0x05, 0x69, 0xbc, 0x97, 0xaf, 0xfc, 0x01, 0x4d, 0x63, 0x8f, 0x51, + 0xdf, 0xfe, 0xff, 0x67, 0x92, 0x73, 0x08, 0x5b, 0xd8, 0x6c, 0xf3, 0x6c, 0x67, 0x9b, 0x26, 0xad, + 0x37, 0x75, 0xea, 0xb7, 0x80, 0x93, 0x4e, 0x5e, 0x7e, 0x84, 0x20, 0x5e, 0x5b, 0x64, 0x01, 0x47, + 0x73, 0xf5, 0xef, 0x2e, 0xb3, 0xe6, 0xb7, 0xae, 0xd4, 0x5c, 0x5f, 0x3b, 0xf2, 0x35, 0xc9, 0xb1, + 0x59, 0x9b, 0xe3, 0x9a, 0xbb, 0xa8, 0xb2, 0x34, 0xea, 0x4f, 0xc4, 0x34, 0xd0, 0x1c, 0xcb, 0x97, + 0x30, 0x3c, 0x77, 0x98, 0x94, 0x18, 0xdf, 0x5c, 0x44, 0x01, 0x27, 0xb6, 0x84, 0x7c, 0x01, 0x21, + 0x83, 0xcc, 0x14, 0xd1, 0x80, 0x3b, 0x6d, 0xb0, 0x7a, 0x0d, 0x23, 0x6f, 0xac, 0x3c, 0x84, 0xf0, + 0xaa, 0x48, 0x2c, 0xdd, 0x99, 0x72, 0xfc, 0xa8, 0x46, 0x67, 0xc6, 0x2c, 0xf3, 0xc4, 0x2d, 0xc7, + 0x42, 0xfd, 0x12, 0xf0, 0xf8, 0x0a, 0x8b, 0x74, 0x0f, 0x5f, 0x6b, 0x91, 0x0b, 0x67, 0xf2, 0x56, + 0x78, 0x1d, 0xcb, 0x23, 0xe8, 0xc5, 0x86, 0x65, 0x0f, 0x75, 0x2f, 0x36, 0xbb, 0xa7, 0x0d, 0x3a, + 0xa7, 0x65, 0xe1, 0x26, 0xb7, 0x0e, 0x89, 0x58, 0x78, 0xa8, 0x37, 0x58, 0x9e, 0xc2, 0xe0, 0x33, + 0xa6, 0x95, 0x8d, 0x0e, 0x38, 0xf1, 0x00, 0xd4, 0x07, 0x08, 0x2f, 0x9d, 0xb1, 0xe8, 0xca, 0xf5, + 0xc6, 0x3c, 0xe1, 0x99, 0x77, 0x0a, 0x83, 0xeb, 0x64, 0x55, 0xb5, 0x8e, 0x3e, 0x00, 0x75, 0xdb, + 0x2e, 0x46, 0x72, 0x0a, 0xc7, 0xdf, 0x09, 0x53, 0x5f, 0x98, 0xe0, 0x01, 0xbb, 0xb4, 0x7c, 0x0b, + 0xd0, 0x8c, 0xca, 0x90, 0xa2, 0x1e, 0xbf, 0x8f, 0x27, 0x7c, 0xd9, 0x56, 0x81, 0xf6, 0x0a, 0xd4, + 0x0d, 0x80, 0xc6, 0x1f, 0x98, 0xfd, 0xc4, 0x7d, 0xfc, 0x7b, 0x03, 0xe3, 0xf3, 0x15, 0x26, 0x6e, + 0xf7, 0xed, 0x87, 0xba, 0xc3, 0xab, 0x43, 0xaf, 0x33, 0xdd, 0x1e, 0xf0, 0xb7, 0xfb, 0xfe, 0x4f, + 0x00, 0x00, 0x00, 0xff, 0xff, 0xfd, 0xc4, 0x8d, 0xb7, 0xc8, 0x03, 0x00, 0x00, } diff --git a/cmd/replication.v2/pdu.proto b/cmd/replication/pdu/pdu.proto similarity index 98% rename from cmd/replication.v2/pdu.proto rename to cmd/replication/pdu/pdu.proto index 72fc56c..cac47b1 100644 --- a/cmd/replication.v2/pdu.proto +++ b/cmd/replication/pdu/pdu.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package replication; +package pdu; message ListFilesystemReq {} diff --git a/cmd/replication.v2/pdu_extras.go b/cmd/replication/pdu/pdu_extras.go similarity index 87% rename from cmd/replication.v2/pdu_extras.go rename to cmd/replication/pdu/pdu_extras.go index 0848bd6..19e27e8 100644 --- a/cmd/replication.v2/pdu_extras.go +++ b/cmd/replication/pdu/pdu_extras.go @@ -1,4 +1,4 @@ -package replication +package pdu import ( "fmt" @@ -45,6 +45,15 @@ func (v *FilesystemVersion) CreationAsTime() (time.Time, error) { return time.Parse(time.RFC3339, v.Creation) } +// implement fsfsm.FilesystemVersion +func (v *FilesystemVersion) SnapshotTime() time.Time { + t, err := v.CreationAsTime() + if err != nil { + panic(err) // FIXME + } + return t +} + func (v *FilesystemVersion) ZFSFilesystemVersion() *zfs.FilesystemVersion { ct := time.Time{} if v.Creation != "" { @@ -61,4 +70,4 @@ func (v *FilesystemVersion) ZFSFilesystemVersion() *zfs.FilesystemVersion { CreateTXG: v.CreateTXG, Creation: ct, } -} \ No newline at end of file +} diff --git a/cmd/replication.v2/pdu_test.go b/cmd/replication/pdu/pdu_test.go similarity index 98% rename from cmd/replication.v2/pdu_test.go rename to cmd/replication/pdu/pdu_test.go index 3b26572..86e2b49 100644 --- a/cmd/replication.v2/pdu_test.go +++ b/cmd/replication/pdu/pdu_test.go @@ -1,4 +1,4 @@ -package replication +package pdu import ( "testing" diff --git a/cmd/replication/replication.go b/cmd/replication/replication.go new file mode 100644 index 0000000..75b1dd5 --- /dev/null +++ b/cmd/replication/replication.go @@ -0,0 +1,19 @@ +package replication + +import ( + "context" + + "github.com/zrepl/zrepl/cmd/replication/common" + "github.com/zrepl/zrepl/cmd/replication/internal/mainfsm" +) + +type Report = mainfsm.Report + +type Replication interface { + Drive(ctx context.Context, ep common.EndpointPair) + Report() *Report +} + +func NewReplication() Replication { + return mainfsm.NewReplication() +}