mirror of
https://github.com/zrepl/zrepl.git
synced 2024-12-22 15:11:16 +01:00
Merge branch 'problame/overlapping-dataset-hierarchy-improvements' into 'master'
closes #153
This commit is contained in:
commit
0ab62f197a
@ -5,16 +5,18 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/zrepl/zrepl/daemon/job"
|
||||
"github.com/zrepl/zrepl/daemon/nethelpers"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"github.com/zrepl/zrepl/version"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
type controlJob struct {
|
||||
@ -38,6 +40,8 @@ func (j *controlJob) Name() string { return jobNameControl }
|
||||
|
||||
func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
|
||||
|
||||
func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }
|
||||
|
||||
var promControl struct {
|
||||
requestBegin *prometheus.CounterVec
|
||||
requestFinished *prometheus.HistogramVec
|
||||
|
@ -299,6 +299,15 @@ func (j *ActiveSide) Status() *Status {
|
||||
return &Status{Type: t, JobSpecific: s}
|
||||
}
|
||||
|
||||
func (j *ActiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||
pull, ok := j.mode.(*modePull)
|
||||
if !ok {
|
||||
_ = j.mode.(*modePush) // make sure we didn't introduce a new job type
|
||||
return nil, false
|
||||
}
|
||||
return pull.rootFS.Copy(), true
|
||||
}
|
||||
|
||||
func (j *ActiveSide) Run(ctx context.Context) {
|
||||
log := GetLogger(ctx)
|
||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||
|
@ -2,6 +2,9 @@ package job
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/config"
|
||||
)
|
||||
@ -18,6 +21,22 @@ func JobsFromConfig(c *config.Config) ([]Job, error) {
|
||||
}
|
||||
js[i] = j
|
||||
}
|
||||
|
||||
// receiving-side root filesystems must not overlap
|
||||
{
|
||||
rfss := make([]string, len(js))
|
||||
for i, j := range js {
|
||||
jrfs, ok := j.OwnedDatasetSubtreeRoot()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
rfss[i] = jrfs.ToString()
|
||||
}
|
||||
if err := validateReceivingSidesDoNotOverlap(rfss); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return js, nil
|
||||
}
|
||||
|
||||
@ -74,3 +93,27 @@ func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) {
|
||||
return j, nil
|
||||
|
||||
}
|
||||
|
||||
func validateReceivingSidesDoNotOverlap(receivingRootFSs []string) error {
|
||||
if len(receivingRootFSs) == 0 {
|
||||
return nil
|
||||
}
|
||||
rfss := make([]string, len(receivingRootFSs))
|
||||
copy(rfss, receivingRootFSs)
|
||||
sort.Slice(rfss, func(i, j int) bool {
|
||||
return strings.Compare(rfss[i], rfss[j]) == -1
|
||||
})
|
||||
// idea:
|
||||
// no path in rfss must be prefix of another
|
||||
//
|
||||
// rfss is now lexicographically sorted, which means that
|
||||
// if i is prefix of j, i < j (in lexicographical order)
|
||||
// thus,
|
||||
// if any i is prefix of i+n (n >= 1), there is overlap
|
||||
for i := 0; i < len(rfss)-1; i++ {
|
||||
if strings.HasPrefix(rfss[i+1], rfss[i]) {
|
||||
return fmt.Errorf("receiving jobs with overlapping root filesystems are forbidden")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
42
daemon/job/build_jobs_test.go
Normal file
42
daemon/job/build_jobs_test.go
Normal file
@ -0,0 +1,42 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestValidateReceivingSidesDoNotOverlap(t *testing.T) {
|
||||
type testCase struct {
|
||||
err bool
|
||||
input []string
|
||||
}
|
||||
tcs := []testCase{
|
||||
{false, nil},
|
||||
{false, []string{}},
|
||||
{false, []string{""}}, // not our job to determine valid paths
|
||||
{false, []string{"a"}},
|
||||
{false, []string{"some/path"}},
|
||||
{false, []string{"zroot/sink1", "zroot/sink2", "zroot/sink3"}},
|
||||
{true, []string{"zroot/b", "zroot/b"}},
|
||||
{true, []string{"zroot/foo", "zroot/foo/bar", "zroot/baz"}},
|
||||
{false, []string{"a/x", "b/x"}},
|
||||
{false, []string{"a", "b"}},
|
||||
{true, []string{"a", "a"}},
|
||||
{true, []string{"a/x/y", "a/x"}},
|
||||
{true, []string{"a/x", "a/x/y"}},
|
||||
{true, []string{"a/x", "b/x", "a/x/y"}},
|
||||
{true, []string{"a", "a/b", "a/c", "a/b"}},
|
||||
{true, []string{"a/b", "a/c", "a/b", "a/d", "a/c"}},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Logf("input: %v", tc.input)
|
||||
err := validateReceivingSidesDoNotOverlap(tc.input)
|
||||
if tc.err {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}
|
@ -4,8 +4,10 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
type Logger = logger.Logger
|
||||
@ -33,6 +35,9 @@ type Job interface {
|
||||
Run(ctx context.Context)
|
||||
Status() *Status
|
||||
RegisterMetrics(registerer prometheus.Registerer)
|
||||
// Jobs that return a subtree of the dataset hierarchy
|
||||
// must return the root of that subtree as rfs and ok = true
|
||||
OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool)
|
||||
}
|
||||
|
||||
type Type string
|
||||
|
@ -103,6 +103,15 @@ func (s *PassiveSide) Status() *Status {
|
||||
return &Status{Type: s.mode.Type()} // FIXME PassiveStatus
|
||||
}
|
||||
|
||||
func (j *PassiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||
sink, ok := j.mode.(*modeSink)
|
||||
if !ok {
|
||||
_ = j.mode.(*modeSource) // make sure we didn't introduce a new job type
|
||||
return nil, false
|
||||
}
|
||||
return sink.rootDataset.Copy(), true
|
||||
}
|
||||
|
||||
func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}
|
||||
|
||||
func (j *PassiveSide) Run(ctx context.Context) {
|
||||
|
@ -77,6 +77,10 @@ func (j *SnapJob) Status() *Status {
|
||||
return &Status{Type: t, JobSpecific: s}
|
||||
}
|
||||
|
||||
func (j *SnapJob) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (j *SnapJob) Run(ctx context.Context) {
|
||||
log := GetLogger(ctx)
|
||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||
|
@ -42,6 +42,8 @@ func (j *prometheusJob) Name() string { return jobNamePrometheus }
|
||||
|
||||
func (j *prometheusJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
|
||||
|
||||
func (j *prometheusJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }
|
||||
|
||||
func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}
|
||||
|
||||
func (j *prometheusJob) Run(ctx context.Context) {
|
||||
|
@ -108,6 +108,7 @@ The **replication cursor bookmark** ``#zrepl_replication_cursor`` is kept per fi
|
||||
It is a bookmark of the most recent successfully replicated snapshot to the receiving side.
|
||||
It is is used by the :ref:`not_replicated <prune-keep-not-replicated>` keep rule to identify all snapshots that have not yet been replicated to the receiving side.
|
||||
Regardless of whether that keep rule is used, the bookmark ensures that replication can always continue incrementally.
|
||||
Note that there is only one cursor bookmark per filesystem, which prohibits multiple jobs to replicate the same filesystem (:ref:`see below<jobs-multiple-jobs>`).
|
||||
|
||||
.. _replication-placeholder-property:
|
||||
|
||||
@ -172,6 +173,50 @@ Note that you will have to trigger replication manually using the ``zrepl signal
|
||||
type: manual
|
||||
...
|
||||
|
||||
.. _jobs-multiple-jobs:
|
||||
|
||||
Multiple Jobs & More than 2 Machines
|
||||
------------------------------------
|
||||
|
||||
.. ATTENTION::
|
||||
|
||||
When using multiple jobs across single or multiple machines, the following rules are critical to avoid race conditions & data loss:
|
||||
|
||||
1. The sets of ZFS filesystems matched by the ``filesystems`` filter fields must be disjoint across all jobs configured on a machine.
|
||||
2. The ZFS filesystem subtrees of jobs with ``root_fs`` must be disjoint.
|
||||
3. Across all zrepl instances on all machines in the replication domain, there must be a 1:1 correspondence between active and passive jobs.
|
||||
|
||||
Explanations & exceptions to above rules are detailed below.
|
||||
|
||||
If you would like to see improvements to multi-job setups, please `open an issue on GitHub <https://github.com/zrepl/zrepl/issues/new>`_.
|
||||
|
||||
No Overlapping
|
||||
~~~~~~~~~~~~~~
|
||||
|
||||
Jobs run independently of each other.
|
||||
If two jobs match the same filesystem with their ``filesystems`` filter, they will operate on that filesystem independently and potentially in parallel.
|
||||
For example, if job A prunes snapshots that job B is planning to replicate, the replication will fail because B asssumed the snapshot to still be present.
|
||||
More subtle race conditions can occur with the :ref:`replication cursor bookmark <replication-cursor-bookmark>`, which currently only exists once per filesystem.
|
||||
|
||||
N push jobs to 1 sink
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The :ref:`sink job <job-sink>` namespaces by client identity.
|
||||
It is thus safe to push to one sink job with different client identities.
|
||||
If the push jobs have the same client identity, the filesystems matched by the push jobs must be disjoint to avoid races.
|
||||
|
||||
N pull jobs from 1 source
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Multiple pull jobs pulling from the same source have potential for race conditions during pruning:
|
||||
each pull job prunes the source side independently, causing replication-prune and prune-prune races.
|
||||
|
||||
There is currently no way for a pull job to filter which snapshots it should attempt to replicate.
|
||||
Thus, it is not possibe to just manually assert that the prune rules of all pull jobs are disjoint to avoid replication-prune and prune-prune races.
|
||||
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
|
||||
.. _job-push:
|
||||
|
||||
@ -217,8 +262,8 @@ Job Type ``sink``
|
||||
* - ``serve``
|
||||
- |serve-transport|
|
||||
* - ``root_fs``
|
||||
- ZFS dataset path are received to
|
||||
``$root_fs/$client_identity``
|
||||
- ZFS filesystems are received to
|
||||
``$root_fs/$client_identity/$source_path``
|
||||
|
||||
Example config: :sampleconf:`/sink.yml`
|
||||
|
||||
@ -240,8 +285,8 @@ Job Type ``pull``
|
||||
* - ``connect``
|
||||
- |connect-transport|
|
||||
* - ``root_fs``
|
||||
- ZFS dataset path are received to
|
||||
``$root_fs/$client_identity``
|
||||
- ZFS filesystems are received to
|
||||
``$root_fs/$source_path``
|
||||
* - ``interval``
|
||||
- | Interval at which to pull from the source job (e.g. ``10m``).
|
||||
| ``manual`` disables periodic pulling, replication then only happens on :ref:`wakeup <cli-signal-wakeup>`.
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/replication/logic/pdu"
|
||||
"github.com/zrepl/zrepl/util/chainlock"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
@ -170,13 +171,19 @@ type FSMap interface { // FIXME unused
|
||||
type Receiver struct {
|
||||
rootWithoutClientComponent *zfs.DatasetPath
|
||||
appendClientIdentity bool
|
||||
|
||||
recvParentCreationMtx *chainlock.L
|
||||
}
|
||||
|
||||
func NewReceiver(rootDataset *zfs.DatasetPath, appendClientIdentity bool) *Receiver {
|
||||
if rootDataset.Length() <= 0 {
|
||||
panic(fmt.Sprintf("root dataset must not be an empty path: %v", rootDataset))
|
||||
}
|
||||
return &Receiver{rootWithoutClientComponent: rootDataset.Copy(), appendClientIdentity: appendClientIdentity}
|
||||
return &Receiver{
|
||||
rootWithoutClientComponent: rootDataset.Copy(),
|
||||
appendClientIdentity: appendClientIdentity,
|
||||
recvParentCreationMtx: chainlock.New(),
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error {
|
||||
@ -323,40 +330,53 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
|
||||
}
|
||||
|
||||
// create placeholder parent filesystems as appropriate
|
||||
//
|
||||
// Manipulating the ZFS dataset hierarchy must happen exclusively.
|
||||
// TODO: Use fine-grained locking to allow separate clients / requests to pass
|
||||
// through the following section concurrently when operating on disjoint
|
||||
// ZFS dataset hierarchy subtrees.
|
||||
var visitErr error
|
||||
f := zfs.NewDatasetPathForest()
|
||||
f.Add(lp)
|
||||
getLogger(ctx).Debug("begin tree-walk")
|
||||
f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) {
|
||||
if v.Path.Equal(lp) {
|
||||
return false
|
||||
}
|
||||
ph, err := zfs.ZFSGetFilesystemPlaceholderState(v.Path)
|
||||
if err != nil {
|
||||
visitErr = err
|
||||
return false
|
||||
}
|
||||
getLogger(ctx).
|
||||
WithField("fs", v.Path.ToString()).
|
||||
WithField("placeholder_state", fmt.Sprintf("%#v", ph)).
|
||||
Debug("placeholder state for filesystem")
|
||||
func() {
|
||||
getLogger(ctx).Debug("begin aquire recvParentCreationMtx")
|
||||
defer s.recvParentCreationMtx.Lock().Unlock()
|
||||
getLogger(ctx).Debug("end aquire recvParentCreationMtx")
|
||||
defer getLogger(ctx).Debug("release recvParentCreationMtx")
|
||||
|
||||
if !ph.FSExists {
|
||||
l := getLogger(ctx).WithField("placeholder_fs", v.Path)
|
||||
l.Debug("create placeholder filesystem")
|
||||
err := zfs.ZFSCreatePlaceholderFilesystem(v.Path)
|
||||
f := zfs.NewDatasetPathForest()
|
||||
f.Add(lp)
|
||||
getLogger(ctx).Debug("begin tree-walk")
|
||||
f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) {
|
||||
if v.Path.Equal(lp) {
|
||||
return false
|
||||
}
|
||||
ph, err := zfs.ZFSGetFilesystemPlaceholderState(v.Path)
|
||||
getLogger(ctx).
|
||||
WithField("fs", v.Path.ToString()).
|
||||
WithField("placeholder_state", fmt.Sprintf("%#v", ph)).
|
||||
WithField("err", fmt.Sprintf("%s", err)).
|
||||
WithField("errType", fmt.Sprintf("%T", err)).
|
||||
Debug("placeholder state for filesystem")
|
||||
if err != nil {
|
||||
l.WithError(err).Error("cannot create placeholder filesystem")
|
||||
visitErr = err
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists")
|
||||
return true // leave this fs as is
|
||||
})
|
||||
getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
|
||||
|
||||
if !ph.FSExists {
|
||||
l := getLogger(ctx).WithField("placeholder_fs", v.Path)
|
||||
l.Debug("create placeholder filesystem")
|
||||
err := zfs.ZFSCreatePlaceholderFilesystem(v.Path)
|
||||
if err != nil {
|
||||
l.WithError(err).Error("cannot create placeholder filesystem")
|
||||
visitErr = err
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists")
|
||||
return true // leave this fs as is
|
||||
})
|
||||
}()
|
||||
getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk")
|
||||
if visitErr != nil {
|
||||
return nil, visitErr
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user