mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-29 03:45:27 +01:00
config + job: forbid non-verlapping receiver root_fs
refs #136 refs #140
This commit is contained in:
parent
3e71542c78
commit
7756c9a55c
@ -5,16 +5,18 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/zrepl/zrepl/daemon/job"
|
||||
"github.com/zrepl/zrepl/daemon/nethelpers"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"github.com/zrepl/zrepl/version"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
type controlJob struct {
|
||||
@ -38,6 +40,8 @@ func (j *controlJob) Name() string { return jobNameControl }
|
||||
|
||||
func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
|
||||
|
||||
func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }
|
||||
|
||||
var promControl struct {
|
||||
requestBegin *prometheus.CounterVec
|
||||
requestFinished *prometheus.HistogramVec
|
||||
|
@ -299,6 +299,15 @@ func (j *ActiveSide) Status() *Status {
|
||||
return &Status{Type: t, JobSpecific: s}
|
||||
}
|
||||
|
||||
func (j *ActiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||
pull, ok := j.mode.(*modePull)
|
||||
if !ok {
|
||||
_ = j.mode.(*modePush) // make sure we didn't introduce a new job type
|
||||
return nil, false
|
||||
}
|
||||
return pull.rootFS.Copy(), true
|
||||
}
|
||||
|
||||
func (j *ActiveSide) Run(ctx context.Context) {
|
||||
log := GetLogger(ctx)
|
||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||
|
@ -2,6 +2,9 @@ package job
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/config"
|
||||
)
|
||||
@ -18,6 +21,22 @@ func JobsFromConfig(c *config.Config) ([]Job, error) {
|
||||
}
|
||||
js[i] = j
|
||||
}
|
||||
|
||||
// receiving-side root filesystems must not overlap
|
||||
{
|
||||
rfss := make([]string, len(js))
|
||||
for i, j := range js {
|
||||
jrfs, ok := j.OwnedDatasetSubtreeRoot()
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
rfss[i] = jrfs.ToString()
|
||||
}
|
||||
if err := validateReceivingSidesDoNotOverlap(rfss); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return js, nil
|
||||
}
|
||||
|
||||
@ -74,3 +93,27 @@ func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) {
|
||||
return j, nil
|
||||
|
||||
}
|
||||
|
||||
func validateReceivingSidesDoNotOverlap(receivingRootFSs []string) error {
|
||||
if len(receivingRootFSs) == 0 {
|
||||
return nil
|
||||
}
|
||||
rfss := make([]string, len(receivingRootFSs))
|
||||
copy(rfss, receivingRootFSs)
|
||||
sort.Slice(rfss, func(i, j int) bool {
|
||||
return strings.Compare(rfss[i], rfss[j]) == -1
|
||||
})
|
||||
// idea:
|
||||
// no path in rfss must be prefix of another
|
||||
//
|
||||
// rfss is now lexicographically sorted, which means that
|
||||
// if i is prefix of j, i < j (in lexicographical order)
|
||||
// thus,
|
||||
// if any i is prefix of i+n (n >= 1), there is overlap
|
||||
for i := 0; i < len(rfss)-1; i++ {
|
||||
if strings.HasPrefix(rfss[i+1], rfss[i]) {
|
||||
return fmt.Errorf("receiving jobs with overlapping root filesystems are forbidden")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
42
daemon/job/build_jobs_test.go
Normal file
42
daemon/job/build_jobs_test.go
Normal file
@ -0,0 +1,42 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestValidateReceivingSidesDoNotOverlap(t *testing.T) {
|
||||
type testCase struct {
|
||||
err bool
|
||||
input []string
|
||||
}
|
||||
tcs := []testCase{
|
||||
{false, nil},
|
||||
{false, []string{}},
|
||||
{false, []string{""}}, // not our job to determine valid paths
|
||||
{false, []string{"a"}},
|
||||
{false, []string{"some/path"}},
|
||||
{false, []string{"zroot/sink1", "zroot/sink2", "zroot/sink3"}},
|
||||
{true, []string{"zroot/b", "zroot/b"}},
|
||||
{true, []string{"zroot/foo", "zroot/foo/bar", "zroot/baz"}},
|
||||
{false, []string{"a/x", "b/x"}},
|
||||
{false, []string{"a", "b"}},
|
||||
{true, []string{"a", "a"}},
|
||||
{true, []string{"a/x/y", "a/x"}},
|
||||
{true, []string{"a/x", "a/x/y"}},
|
||||
{true, []string{"a/x", "b/x", "a/x/y"}},
|
||||
{true, []string{"a", "a/b", "a/c", "a/b"}},
|
||||
{true, []string{"a/b", "a/c", "a/b", "a/d", "a/c"}},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Logf("input: %v", tc.input)
|
||||
err := validateReceivingSidesDoNotOverlap(tc.input)
|
||||
if tc.err {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}
|
@ -4,8 +4,10 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
type Logger = logger.Logger
|
||||
@ -33,6 +35,9 @@ type Job interface {
|
||||
Run(ctx context.Context)
|
||||
Status() *Status
|
||||
RegisterMetrics(registerer prometheus.Registerer)
|
||||
// Jobs that return a subtree of the dataset hierarchy
|
||||
// must return the root of that subtree as rfs and ok = true
|
||||
OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool)
|
||||
}
|
||||
|
||||
type Type string
|
||||
|
@ -103,6 +103,15 @@ func (s *PassiveSide) Status() *Status {
|
||||
return &Status{Type: s.mode.Type()} // FIXME PassiveStatus
|
||||
}
|
||||
|
||||
func (j *PassiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||
sink, ok := j.mode.(*modeSink)
|
||||
if !ok {
|
||||
_ = j.mode.(*modeSource) // make sure we didn't introduce a new job type
|
||||
return nil, false
|
||||
}
|
||||
return sink.rootDataset.Copy(), true
|
||||
}
|
||||
|
||||
func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}
|
||||
|
||||
func (j *PassiveSide) Run(ctx context.Context) {
|
||||
|
@ -77,6 +77,10 @@ func (j *SnapJob) Status() *Status {
|
||||
return &Status{Type: t, JobSpecific: s}
|
||||
}
|
||||
|
||||
func (j *SnapJob) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (j *SnapJob) Run(ctx context.Context) {
|
||||
log := GetLogger(ctx)
|
||||
ctx = logging.WithSubsystemLoggers(ctx, log)
|
||||
|
@ -42,6 +42,8 @@ func (j *prometheusJob) Name() string { return jobNamePrometheus }
|
||||
|
||||
func (j *prometheusJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
|
||||
|
||||
func (j *prometheusJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }
|
||||
|
||||
func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}
|
||||
|
||||
func (j *prometheusJob) Run(ctx context.Context) {
|
||||
|
Loading…
Reference in New Issue
Block a user