mirror of
https://github.com/zrepl/zrepl.git
synced 2025-05-17 23:30:46 +02:00
config + job: forbid non-verlapping receiver root_fs
refs #136 refs #140
This commit is contained in:
parent
3e71542c78
commit
7756c9a55c
@ -5,16 +5,18 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/zrepl/zrepl/daemon/job"
|
"github.com/zrepl/zrepl/daemon/job"
|
||||||
"github.com/zrepl/zrepl/daemon/nethelpers"
|
"github.com/zrepl/zrepl/daemon/nethelpers"
|
||||||
"github.com/zrepl/zrepl/logger"
|
"github.com/zrepl/zrepl/logger"
|
||||||
"github.com/zrepl/zrepl/version"
|
"github.com/zrepl/zrepl/version"
|
||||||
"io"
|
"github.com/zrepl/zrepl/zfs"
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type controlJob struct {
|
type controlJob struct {
|
||||||
@ -38,6 +40,8 @@ func (j *controlJob) Name() string { return jobNameControl }
|
|||||||
|
|
||||||
func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
|
func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
|
||||||
|
|
||||||
|
func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }
|
||||||
|
|
||||||
var promControl struct {
|
var promControl struct {
|
||||||
requestBegin *prometheus.CounterVec
|
requestBegin *prometheus.CounterVec
|
||||||
requestFinished *prometheus.HistogramVec
|
requestFinished *prometheus.HistogramVec
|
||||||
|
@ -299,6 +299,15 @@ func (j *ActiveSide) Status() *Status {
|
|||||||
return &Status{Type: t, JobSpecific: s}
|
return &Status{Type: t, JobSpecific: s}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (j *ActiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||||
|
pull, ok := j.mode.(*modePull)
|
||||||
|
if !ok {
|
||||||
|
_ = j.mode.(*modePush) // make sure we didn't introduce a new job type
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return pull.rootFS.Copy(), true
|
||||||
|
}
|
||||||
|
|
||||||
func (j *ActiveSide) Run(ctx context.Context) {
|
func (j *ActiveSide) Run(ctx context.Context) {
|
||||||
log := GetLogger(ctx)
|
log := GetLogger(ctx)
|
||||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||||
|
@ -2,6 +2,9 @@ package job
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/zrepl/zrepl/config"
|
"github.com/zrepl/zrepl/config"
|
||||||
)
|
)
|
||||||
@ -18,6 +21,22 @@ func JobsFromConfig(c *config.Config) ([]Job, error) {
|
|||||||
}
|
}
|
||||||
js[i] = j
|
js[i] = j
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// receiving-side root filesystems must not overlap
|
||||||
|
{
|
||||||
|
rfss := make([]string, len(js))
|
||||||
|
for i, j := range js {
|
||||||
|
jrfs, ok := j.OwnedDatasetSubtreeRoot()
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rfss[i] = jrfs.ToString()
|
||||||
|
}
|
||||||
|
if err := validateReceivingSidesDoNotOverlap(rfss); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return js, nil
|
return js, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,3 +93,27 @@ func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) {
|
|||||||
return j, nil
|
return j, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateReceivingSidesDoNotOverlap(receivingRootFSs []string) error {
|
||||||
|
if len(receivingRootFSs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rfss := make([]string, len(receivingRootFSs))
|
||||||
|
copy(rfss, receivingRootFSs)
|
||||||
|
sort.Slice(rfss, func(i, j int) bool {
|
||||||
|
return strings.Compare(rfss[i], rfss[j]) == -1
|
||||||
|
})
|
||||||
|
// idea:
|
||||||
|
// no path in rfss must be prefix of another
|
||||||
|
//
|
||||||
|
// rfss is now lexicographically sorted, which means that
|
||||||
|
// if i is prefix of j, i < j (in lexicographical order)
|
||||||
|
// thus,
|
||||||
|
// if any i is prefix of i+n (n >= 1), there is overlap
|
||||||
|
for i := 0; i < len(rfss)-1; i++ {
|
||||||
|
if strings.HasPrefix(rfss[i+1], rfss[i]) {
|
||||||
|
return fmt.Errorf("receiving jobs with overlapping root filesystems are forbidden")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
42
daemon/job/build_jobs_test.go
Normal file
42
daemon/job/build_jobs_test.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package job
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestValidateReceivingSidesDoNotOverlap(t *testing.T) {
|
||||||
|
type testCase struct {
|
||||||
|
err bool
|
||||||
|
input []string
|
||||||
|
}
|
||||||
|
tcs := []testCase{
|
||||||
|
{false, nil},
|
||||||
|
{false, []string{}},
|
||||||
|
{false, []string{""}}, // not our job to determine valid paths
|
||||||
|
{false, []string{"a"}},
|
||||||
|
{false, []string{"some/path"}},
|
||||||
|
{false, []string{"zroot/sink1", "zroot/sink2", "zroot/sink3"}},
|
||||||
|
{true, []string{"zroot/b", "zroot/b"}},
|
||||||
|
{true, []string{"zroot/foo", "zroot/foo/bar", "zroot/baz"}},
|
||||||
|
{false, []string{"a/x", "b/x"}},
|
||||||
|
{false, []string{"a", "b"}},
|
||||||
|
{true, []string{"a", "a"}},
|
||||||
|
{true, []string{"a/x/y", "a/x"}},
|
||||||
|
{true, []string{"a/x", "a/x/y"}},
|
||||||
|
{true, []string{"a/x", "b/x", "a/x/y"}},
|
||||||
|
{true, []string{"a", "a/b", "a/c", "a/b"}},
|
||||||
|
{true, []string{"a/b", "a/c", "a/b", "a/d", "a/c"}},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Logf("input: %v", tc.input)
|
||||||
|
err := validateReceivingSidesDoNotOverlap(tc.input)
|
||||||
|
if tc.err {
|
||||||
|
assert.Error(t, err)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -4,8 +4,10 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/zrepl/zrepl/logger"
|
"github.com/zrepl/zrepl/logger"
|
||||||
|
"github.com/zrepl/zrepl/zfs"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Logger = logger.Logger
|
type Logger = logger.Logger
|
||||||
@ -33,6 +35,9 @@ type Job interface {
|
|||||||
Run(ctx context.Context)
|
Run(ctx context.Context)
|
||||||
Status() *Status
|
Status() *Status
|
||||||
RegisterMetrics(registerer prometheus.Registerer)
|
RegisterMetrics(registerer prometheus.Registerer)
|
||||||
|
// Jobs that return a subtree of the dataset hierarchy
|
||||||
|
// must return the root of that subtree as rfs and ok = true
|
||||||
|
OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Type string
|
type Type string
|
||||||
|
@ -103,6 +103,15 @@ func (s *PassiveSide) Status() *Status {
|
|||||||
return &Status{Type: s.mode.Type()} // FIXME PassiveStatus
|
return &Status{Type: s.mode.Type()} // FIXME PassiveStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (j *PassiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||||
|
sink, ok := j.mode.(*modeSink)
|
||||||
|
if !ok {
|
||||||
|
_ = j.mode.(*modeSource) // make sure we didn't introduce a new job type
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return sink.rootDataset.Copy(), true
|
||||||
|
}
|
||||||
|
|
||||||
func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}
|
func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}
|
||||||
|
|
||||||
func (j *PassiveSide) Run(ctx context.Context) {
|
func (j *PassiveSide) Run(ctx context.Context) {
|
||||||
|
@ -77,6 +77,10 @@ func (j *SnapJob) Status() *Status {
|
|||||||
return &Status{Type: t, JobSpecific: s}
|
return &Status{Type: t, JobSpecific: s}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (j *SnapJob) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
func (j *SnapJob) Run(ctx context.Context) {
|
func (j *SnapJob) Run(ctx context.Context) {
|
||||||
log := GetLogger(ctx)
|
log := GetLogger(ctx)
|
||||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||||
|
@ -42,6 +42,8 @@ func (j *prometheusJob) Name() string { return jobNamePrometheus }
|
|||||||
|
|
||||||
func (j *prometheusJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
|
func (j *prometheusJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
|
||||||
|
|
||||||
|
func (j *prometheusJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }
|
||||||
|
|
||||||
func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}
|
func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}
|
||||||
|
|
||||||
func (j *prometheusJob) Run(ctx context.Context) {
|
func (j *prometheusJob) Run(ctx context.Context) {
|
||||||
|
Loading…
Reference in New Issue
Block a user