diff --git a/daemon/control.go b/daemon/control.go index 41ad767..6e60061 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -5,16 +5,18 @@ import ( "context" "encoding/json" "fmt" + "io" + "net" + "net/http" + "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/daemon/nethelpers" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/version" - "io" - "net" - "net/http" - "time" + "github.com/zrepl/zrepl/zfs" ) type controlJob struct { @@ -38,6 +40,8 @@ func (j *controlJob) Name() string { return jobNameControl } func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} } +func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false } + var promControl struct { requestBegin *prometheus.CounterVec requestFinished *prometheus.HistogramVec diff --git a/daemon/job/active.go b/daemon/job/active.go index d70184f..a5b5a94 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -299,6 +299,15 @@ func (j *ActiveSide) Status() *Status { return &Status{Type: t, JobSpecific: s} } +func (j *ActiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) { + pull, ok := j.mode.(*modePull) + if !ok { + _ = j.mode.(*modePush) // make sure we didn't introduce a new job type + return nil, false + } + return pull.rootFS.Copy(), true +} + func (j *ActiveSide) Run(ctx context.Context) { log := GetLogger(ctx) ctx = logging.WithSubsystemLoggers(ctx, log) diff --git a/daemon/job/build_jobs.go b/daemon/job/build_jobs.go index c3eafd4..2e4a271 100644 --- a/daemon/job/build_jobs.go +++ b/daemon/job/build_jobs.go @@ -2,6 +2,9 @@ package job import ( "fmt" + "sort" + "strings" + "github.com/pkg/errors" "github.com/zrepl/zrepl/config" ) @@ -18,6 +21,22 @@ func JobsFromConfig(c *config.Config) ([]Job, error) { } js[i] = j } + + // receiving-side root filesystems must not overlap + { + rfss := make([]string, len(js)) + for i, j := range js { + jrfs, ok := j.OwnedDatasetSubtreeRoot() + if !ok { + continue + } + rfss[i] = jrfs.ToString() + } + if err := validateReceivingSidesDoNotOverlap(rfss); err != nil { + return nil, err + } + } + return js, nil } @@ -74,3 +93,27 @@ func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) { return j, nil } + +func validateReceivingSidesDoNotOverlap(receivingRootFSs []string) error { + if len(receivingRootFSs) == 0 { + return nil + } + rfss := make([]string, len(receivingRootFSs)) + copy(rfss, receivingRootFSs) + sort.Slice(rfss, func(i, j int) bool { + return strings.Compare(rfss[i], rfss[j]) == -1 + }) + // idea: + // no path in rfss must be prefix of another + // + // rfss is now lexicographically sorted, which means that + // if i is prefix of j, i < j (in lexicographical order) + // thus, + // if any i is prefix of i+n (n >= 1), there is overlap + for i := 0; i < len(rfss)-1; i++ { + if strings.HasPrefix(rfss[i+1], rfss[i]) { + return fmt.Errorf("receiving jobs with overlapping root filesystems are forbidden") + } + } + return nil +} diff --git a/daemon/job/build_jobs_test.go b/daemon/job/build_jobs_test.go new file mode 100644 index 0000000..02f122e --- /dev/null +++ b/daemon/job/build_jobs_test.go @@ -0,0 +1,42 @@ +package job + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidateReceivingSidesDoNotOverlap(t *testing.T) { + type testCase struct { + err bool + input []string + } + tcs := []testCase{ + {false, nil}, + {false, []string{}}, + {false, []string{""}}, // not our job to determine valid paths + {false, []string{"a"}}, + {false, []string{"some/path"}}, + {false, []string{"zroot/sink1", "zroot/sink2", "zroot/sink3"}}, + {true, []string{"zroot/b", "zroot/b"}}, + {true, []string{"zroot/foo", "zroot/foo/bar", "zroot/baz"}}, + {false, []string{"a/x", "b/x"}}, + {false, []string{"a", "b"}}, + {true, []string{"a", "a"}}, + {true, []string{"a/x/y", "a/x"}}, + {true, []string{"a/x", "a/x/y"}}, + {true, []string{"a/x", "b/x", "a/x/y"}}, + {true, []string{"a", "a/b", "a/c", "a/b"}}, + {true, []string{"a/b", "a/c", "a/b", "a/d", "a/c"}}, + } + + for _, tc := range tcs { + t.Logf("input: %v", tc.input) + err := validateReceivingSidesDoNotOverlap(tc.input) + if tc.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + } +} diff --git a/daemon/job/job.go b/daemon/job/job.go index 0c97f94..a472bd8 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "fmt" + "github.com/prometheus/client_golang/prometheus" "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/zfs" ) type Logger = logger.Logger @@ -33,6 +35,9 @@ type Job interface { Run(ctx context.Context) Status() *Status RegisterMetrics(registerer prometheus.Registerer) + // Jobs that return a subtree of the dataset hierarchy + // must return the root of that subtree as rfs and ok = true + OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) } type Type string diff --git a/daemon/job/passive.go b/daemon/job/passive.go index aae3f5a..5c7b48b 100644 --- a/daemon/job/passive.go +++ b/daemon/job/passive.go @@ -103,6 +103,15 @@ func (s *PassiveSide) Status() *Status { return &Status{Type: s.mode.Type()} // FIXME PassiveStatus } +func (j *PassiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) { + sink, ok := j.mode.(*modeSink) + if !ok { + _ = j.mode.(*modeSource) // make sure we didn't introduce a new job type + return nil, false + } + return sink.rootDataset.Copy(), true +} + func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {} func (j *PassiveSide) Run(ctx context.Context) { diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index b08a17d..7eb6bef 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -77,6 +77,10 @@ func (j *SnapJob) Status() *Status { return &Status{Type: t, JobSpecific: s} } +func (j *SnapJob) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) { + return nil, false +} + func (j *SnapJob) Run(ctx context.Context) { log := GetLogger(ctx) ctx = logging.WithSubsystemLoggers(ctx, log) diff --git a/daemon/prometheus.go b/daemon/prometheus.go index 1f10e9d..9db3554 100644 --- a/daemon/prometheus.go +++ b/daemon/prometheus.go @@ -42,6 +42,8 @@ func (j *prometheusJob) Name() string { return jobNamePrometheus } func (j *prometheusJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} } +func (j *prometheusJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false } + func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {} func (j *prometheusJob) Run(ctx context.Context) { diff --git a/docs/configuration/jobs.rst b/docs/configuration/jobs.rst index 5e6d0be..7f5207b 100644 --- a/docs/configuration/jobs.rst +++ b/docs/configuration/jobs.rst @@ -108,6 +108,7 @@ The **replication cursor bookmark** ``#zrepl_replication_cursor`` is kept per fi It is a bookmark of the most recent successfully replicated snapshot to the receiving side. It is is used by the :ref:`not_replicated ` keep rule to identify all snapshots that have not yet been replicated to the receiving side. Regardless of whether that keep rule is used, the bookmark ensures that replication can always continue incrementally. +Note that there is only one cursor bookmark per filesystem, which prohibits multiple jobs to replicate the same filesystem (:ref:`see below`). .. _replication-placeholder-property: @@ -172,6 +173,50 @@ Note that you will have to trigger replication manually using the ``zrepl signal type: manual ... +.. _jobs-multiple-jobs: + +Multiple Jobs & More than 2 Machines +------------------------------------ + +.. ATTENTION:: + + When using multiple jobs across single or multiple machines, the following rules are critical to avoid race conditions & data loss: + + 1. The sets of ZFS filesystems matched by the ``filesystems`` filter fields must be disjoint across all jobs configured on a machine. + 2. The ZFS filesystem subtrees of jobs with ``root_fs`` must be disjoint. + 3. Across all zrepl instances on all machines in the replication domain, there must be a 1:1 correspondence between active and passive jobs. + + Explanations & exceptions to above rules are detailed below. + +If you would like to see improvements to multi-job setups, please `open an issue on GitHub `_. + +No Overlapping +~~~~~~~~~~~~~~ + +Jobs run independently of each other. +If two jobs match the same filesystem with their ``filesystems`` filter, they will operate on that filesystem independently and potentially in parallel. +For example, if job A prunes snapshots that job B is planning to replicate, the replication will fail because B asssumed the snapshot to still be present. +More subtle race conditions can occur with the :ref:`replication cursor bookmark `, which currently only exists once per filesystem. + +N push jobs to 1 sink +~~~~~~~~~~~~~~~~~~~~~ + +The :ref:`sink job ` namespaces by client identity. +It is thus safe to push to one sink job with different client identities. +If the push jobs have the same client identity, the filesystems matched by the push jobs must be disjoint to avoid races. + +N pull jobs from 1 source +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Multiple pull jobs pulling from the same source have potential for race conditions during pruning: +each pull job prunes the source side independently, causing replication-prune and prune-prune races. + +There is currently no way for a pull job to filter which snapshots it should attempt to replicate. +Thus, it is not possibe to just manually assert that the prune rules of all pull jobs are disjoint to avoid replication-prune and prune-prune races. + + +------------------------------------------------------------------------------ + .. _job-push: @@ -217,8 +262,8 @@ Job Type ``sink`` * - ``serve`` - |serve-transport| * - ``root_fs`` - - ZFS dataset path are received to - ``$root_fs/$client_identity`` + - ZFS filesystems are received to + ``$root_fs/$client_identity/$source_path`` Example config: :sampleconf:`/sink.yml` @@ -240,8 +285,8 @@ Job Type ``pull`` * - ``connect`` - |connect-transport| * - ``root_fs`` - - ZFS dataset path are received to - ``$root_fs/$client_identity`` + - ZFS filesystems are received to + ``$root_fs/$source_path`` * - ``interval`` - | Interval at which to pull from the source job (e.g. ``10m``). | ``manual`` disables periodic pulling, replication then only happens on :ref:`wakeup `. diff --git a/endpoint/endpoint.go b/endpoint/endpoint.go index 4c393cb..4f35ffb 100644 --- a/endpoint/endpoint.go +++ b/endpoint/endpoint.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "github.com/zrepl/zrepl/replication/logic/pdu" + "github.com/zrepl/zrepl/util/chainlock" "github.com/zrepl/zrepl/zfs" ) @@ -170,13 +171,19 @@ type FSMap interface { // FIXME unused type Receiver struct { rootWithoutClientComponent *zfs.DatasetPath appendClientIdentity bool + + recvParentCreationMtx *chainlock.L } func NewReceiver(rootDataset *zfs.DatasetPath, appendClientIdentity bool) *Receiver { if rootDataset.Length() <= 0 { panic(fmt.Sprintf("root dataset must not be an empty path: %v", rootDataset)) } - return &Receiver{rootWithoutClientComponent: rootDataset.Copy(), appendClientIdentity: appendClientIdentity} + return &Receiver{ + rootWithoutClientComponent: rootDataset.Copy(), + appendClientIdentity: appendClientIdentity, + recvParentCreationMtx: chainlock.New(), + } } func TestClientIdentity(rootFS *zfs.DatasetPath, clientIdentity string) error { @@ -323,40 +330,53 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs } // create placeholder parent filesystems as appropriate + // + // Manipulating the ZFS dataset hierarchy must happen exclusively. + // TODO: Use fine-grained locking to allow separate clients / requests to pass + // through the following section concurrently when operating on disjoint + // ZFS dataset hierarchy subtrees. var visitErr error - f := zfs.NewDatasetPathForest() - f.Add(lp) - getLogger(ctx).Debug("begin tree-walk") - f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) { - if v.Path.Equal(lp) { - return false - } - ph, err := zfs.ZFSGetFilesystemPlaceholderState(v.Path) - if err != nil { - visitErr = err - return false - } - getLogger(ctx). - WithField("fs", v.Path.ToString()). - WithField("placeholder_state", fmt.Sprintf("%#v", ph)). - Debug("placeholder state for filesystem") + func() { + getLogger(ctx).Debug("begin aquire recvParentCreationMtx") + defer s.recvParentCreationMtx.Lock().Unlock() + getLogger(ctx).Debug("end aquire recvParentCreationMtx") + defer getLogger(ctx).Debug("release recvParentCreationMtx") - if !ph.FSExists { - l := getLogger(ctx).WithField("placeholder_fs", v.Path) - l.Debug("create placeholder filesystem") - err := zfs.ZFSCreatePlaceholderFilesystem(v.Path) + f := zfs.NewDatasetPathForest() + f.Add(lp) + getLogger(ctx).Debug("begin tree-walk") + f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) { + if v.Path.Equal(lp) { + return false + } + ph, err := zfs.ZFSGetFilesystemPlaceholderState(v.Path) + getLogger(ctx). + WithField("fs", v.Path.ToString()). + WithField("placeholder_state", fmt.Sprintf("%#v", ph)). + WithField("err", fmt.Sprintf("%s", err)). + WithField("errType", fmt.Sprintf("%T", err)). + Debug("placeholder state for filesystem") if err != nil { - l.WithError(err).Error("cannot create placeholder filesystem") visitErr = err return false } - return true - } - getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists") - return true // leave this fs as is - }) - getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk") + if !ph.FSExists { + l := getLogger(ctx).WithField("placeholder_fs", v.Path) + l.Debug("create placeholder filesystem") + err := zfs.ZFSCreatePlaceholderFilesystem(v.Path) + if err != nil { + l.WithError(err).Error("cannot create placeholder filesystem") + visitErr = err + return false + } + return true + } + getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists") + return true // leave this fs as is + }) + }() + getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk") if visitErr != nil { return nil, visitErr }