config + job: forbid non-verlapping receiver root_fs

refs #136
refs #140
This commit is contained in:
Christian Schwarz 2019-03-20 23:01:24 +01:00
parent 3e71542c78
commit 7756c9a55c
8 changed files with 122 additions and 4 deletions

View File

@ -5,16 +5,18 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/nethelpers"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version"
"io"
"net"
"net/http"
"time"
"github.com/zrepl/zrepl/zfs"
)
type controlJob struct {
@ -38,6 +40,8 @@ func (j *controlJob) Name() string { return jobNameControl }
func (j *controlJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
func (j *controlJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }
var promControl struct {
requestBegin *prometheus.CounterVec
requestFinished *prometheus.HistogramVec

View File

@ -299,6 +299,15 @@ func (j *ActiveSide) Status() *Status {
return &Status{Type: t, JobSpecific: s}
}
func (j *ActiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
pull, ok := j.mode.(*modePull)
if !ok {
_ = j.mode.(*modePush) // make sure we didn't introduce a new job type
return nil, false
}
return pull.rootFS.Copy(), true
}
func (j *ActiveSide) Run(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)

View File

@ -2,6 +2,9 @@ package job
import (
"fmt"
"sort"
"strings"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
)
@ -18,6 +21,22 @@ func JobsFromConfig(c *config.Config) ([]Job, error) {
}
js[i] = j
}
// receiving-side root filesystems must not overlap
{
rfss := make([]string, len(js))
for i, j := range js {
jrfs, ok := j.OwnedDatasetSubtreeRoot()
if !ok {
continue
}
rfss[i] = jrfs.ToString()
}
if err := validateReceivingSidesDoNotOverlap(rfss); err != nil {
return nil, err
}
}
return js, nil
}
@ -74,3 +93,27 @@ func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) {
return j, nil
}
func validateReceivingSidesDoNotOverlap(receivingRootFSs []string) error {
if len(receivingRootFSs) == 0 {
return nil
}
rfss := make([]string, len(receivingRootFSs))
copy(rfss, receivingRootFSs)
sort.Slice(rfss, func(i, j int) bool {
return strings.Compare(rfss[i], rfss[j]) == -1
})
// idea:
// no path in rfss must be prefix of another
//
// rfss is now lexicographically sorted, which means that
// if i is prefix of j, i < j (in lexicographical order)
// thus,
// if any i is prefix of i+n (n >= 1), there is overlap
for i := 0; i < len(rfss)-1; i++ {
if strings.HasPrefix(rfss[i+1], rfss[i]) {
return fmt.Errorf("receiving jobs with overlapping root filesystems are forbidden")
}
}
return nil
}

View File

@ -0,0 +1,42 @@
package job
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestValidateReceivingSidesDoNotOverlap(t *testing.T) {
type testCase struct {
err bool
input []string
}
tcs := []testCase{
{false, nil},
{false, []string{}},
{false, []string{""}}, // not our job to determine valid paths
{false, []string{"a"}},
{false, []string{"some/path"}},
{false, []string{"zroot/sink1", "zroot/sink2", "zroot/sink3"}},
{true, []string{"zroot/b", "zroot/b"}},
{true, []string{"zroot/foo", "zroot/foo/bar", "zroot/baz"}},
{false, []string{"a/x", "b/x"}},
{false, []string{"a", "b"}},
{true, []string{"a", "a"}},
{true, []string{"a/x/y", "a/x"}},
{true, []string{"a/x", "a/x/y"}},
{true, []string{"a/x", "b/x", "a/x/y"}},
{true, []string{"a", "a/b", "a/c", "a/b"}},
{true, []string{"a/b", "a/c", "a/b", "a/d", "a/c"}},
}
for _, tc := range tcs {
t.Logf("input: %v", tc.input)
err := validateReceivingSidesDoNotOverlap(tc.input)
if tc.err {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}
}

View File

@ -4,8 +4,10 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/zfs"
)
type Logger = logger.Logger
@ -33,6 +35,9 @@ type Job interface {
Run(ctx context.Context)
Status() *Status
RegisterMetrics(registerer prometheus.Registerer)
// Jobs that return a subtree of the dataset hierarchy
// must return the root of that subtree as rfs and ok = true
OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool)
}
type Type string

View File

@ -103,6 +103,15 @@ func (s *PassiveSide) Status() *Status {
return &Status{Type: s.mode.Type()} // FIXME PassiveStatus
}
func (j *PassiveSide) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
sink, ok := j.mode.(*modeSink)
if !ok {
_ = j.mode.(*modeSource) // make sure we didn't introduce a new job type
return nil, false
}
return sink.rootDataset.Copy(), true
}
func (*PassiveSide) RegisterMetrics(registerer prometheus.Registerer) {}
func (j *PassiveSide) Run(ctx context.Context) {

View File

@ -77,6 +77,10 @@ func (j *SnapJob) Status() *Status {
return &Status{Type: t, JobSpecific: s}
}
func (j *SnapJob) OwnedDatasetSubtreeRoot() (rfs *zfs.DatasetPath, ok bool) {
return nil, false
}
func (j *SnapJob) Run(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)

View File

@ -42,6 +42,8 @@ func (j *prometheusJob) Name() string { return jobNamePrometheus }
func (j *prometheusJob) Status() *job.Status { return &job.Status{Type: job.TypeInternal} }
func (j *prometheusJob) OwnedDatasetSubtreeRoot() (p *zfs.DatasetPath, ok bool) { return nil, false }
func (j *prometheusJob) RegisterMetrics(registerer prometheus.Registerer) {}
func (j *prometheusJob) Run(ctx context.Context) {