rework filters & mappings

config defines a single datastructure that can act both as a Map and as a Filter
(DatasetMapFilter)

Cleanup wildcard syntax along the way (also changes semantics).
This commit is contained in:
Christian Schwarz 2017-08-05 21:15:37 +02:00
parent 3fac6a67df
commit 2ce07c9342
13 changed files with 478 additions and 459 deletions

View File

@ -52,26 +52,21 @@ type SSHTransport struct {
type Push struct { type Push struct {
JobName string // for use with jobrun package JobName string // for use with jobrun package
To *Remote To *Remote
Filter zfs.DatasetMapping Filter zfs.DatasetFilter
InitialReplPolicy rpc.InitialReplPolicy InitialReplPolicy rpc.InitialReplPolicy
RepeatStrategy jobrun.RepeatStrategy RepeatStrategy jobrun.RepeatStrategy
} }
type Pull struct { type Pull struct {
JobName string // for use with jobrun package JobName string // for use with jobrun package
From *Remote From *Remote
Mapping zfs.DatasetMapping Mapping DatasetMapFilter
InitialReplPolicy rpc.InitialReplPolicy InitialReplPolicy rpc.InitialReplPolicy
RepeatStrategy jobrun.RepeatStrategy RepeatStrategy jobrun.RepeatStrategy
} }
type ClientMapping struct {
From string
Mapping zfs.DatasetMapping
}
type Prune struct { type Prune struct {
JobName string // for use with jobrun package JobName string // for use with jobrun package
DatasetFilter zfs.DatasetMapping DatasetFilter zfs.DatasetFilter
SnapshotFilter zfs.FilesystemVersionFilter SnapshotFilter zfs.FilesystemVersionFilter
RetentionPolicy *RetentionGrid // TODO abstract interface to support future policies? RetentionPolicy *RetentionGrid // TODO abstract interface to support future policies?
} }
@ -80,15 +75,15 @@ type Autosnap struct {
JobName string // for use with jobrun package JobName string // for use with jobrun package
Prefix string Prefix string
Interval jobrun.RepeatStrategy Interval jobrun.RepeatStrategy
DatasetFilter zfs.DatasetMapping DatasetFilter zfs.DatasetFilter
} }
type Config struct { type Config struct {
Remotes map[string]*Remote Remotes map[string]*Remote
Pushs map[string]*Push // job name -> job Pushs map[string]*Push // job name -> job
Pulls map[string]*Pull // job name -> job Pulls map[string]*Pull // job name -> job
Sinks map[string]*ClientMapping // client identity -> mapping Sinks map[string]DatasetMapFilter // client identity -> mapping
PullACLs map[string]*ClientMapping // client identity -> mapping PullACLs map[string]DatasetMapFilter // client identity -> filter
Prunes map[string]*Prune // job name -> job Prunes map[string]*Prune // job name -> job
Autosnaps map[string]*Autosnap // job name -> job Autosnaps map[string]*Autosnap // job name -> job
} }
@ -129,10 +124,10 @@ func parseMain(root map[string]interface{}) (c Config, err error) {
if c.Pulls, err = parsePulls(root["pulls"], remoteLookup); err != nil { if c.Pulls, err = parsePulls(root["pulls"], remoteLookup); err != nil {
return return
} }
if c.Sinks, err = parseClientMappings(root["sinks"]); err != nil { if c.Sinks, err = parseSinks(root["sinks"]); err != nil {
return return
} }
if c.PullACLs, err = parseClientMappings(root["pull_acls"]); err != nil { if c.PullACLs, err = parsePullACLs(root["pull_acls"]); err != nil {
return return
} }
if c.Prunes, err = parsePrunes(root["prune"]); err != nil { if c.Prunes, err = parsePrunes(root["prune"]); err != nil {
@ -239,7 +234,7 @@ func parsePushs(v interface{}, rl remoteLookup) (p map[string]*Push, err error)
if push.JobName, err = fullJobName(JobSectionPush, name); err != nil { if push.JobName, err = fullJobName(JobSectionPush, name); err != nil {
return return
} }
if push.Filter, err = parseComboMapping(e.Filter); err != nil { if push.Filter, err = parseDatasetMapFilter(e.Filter, true); err != nil {
return return
} }
@ -298,7 +293,7 @@ func parsePulls(v interface{}, rl remoteLookup) (p map[string]*Pull, err error)
if pull.JobName, err = fullJobName(JobSectionPull, name); err != nil { if pull.JobName, err = fullJobName(JobSectionPull, name); err != nil {
return return
} }
if pull.Mapping, err = parseComboMapping(e.Mapping); err != nil { if pull.Mapping, err = parseDatasetMapFilter(e.Mapping, false); err != nil {
return return
} }
if pull.InitialReplPolicy, err = parseInitialReplPolicy(e.InitialReplPolicy, rpc.DEFAULT_INITIAL_REPL_POLICY); err != nil { if pull.InitialReplPolicy, err = parseInitialReplPolicy(e.InitialReplPolicy, rpc.DEFAULT_INITIAL_REPL_POLICY); err != nil {
@ -365,99 +360,80 @@ func expectList(v interface{}) (asList []interface{}, err error) {
return return
} }
func parseClientMappings(v interface{}) (cm map[string]*ClientMapping, err error) { func parseSinks(v interface{}) (m map[string]DatasetMapFilter, err error) {
var asMap map[string]map[string]interface{}
var asMap map[string]interface{}
if err = mapstructure.Decode(v, &asMap); err != nil { if err = mapstructure.Decode(v, &asMap); err != nil {
return return
} }
cm = make(map[string]*ClientMapping, len(asMap)) m = make(map[string]DatasetMapFilter, len(asMap))
for identity, e := range asMap { for identity, entry := range asMap {
var m *ClientMapping parseSink := func() (mapping DatasetMapFilter, err error) {
if m, err = parseClientMapping(e, identity); err != nil { mappingMap, ok := entry["mapping"]
if !ok {
err = fmt.Errorf("no mapping specified")
return return
} }
cm[identity] = m mapping, err = parseDatasetMapFilter(mappingMap, false)
return
}
mapping, sinkErr := parseSink()
if sinkErr != nil {
err = fmt.Errorf("cannot parse sink for identity '%s': %s", identity, sinkErr)
return
}
m[identity] = mapping
} }
return return
} }
func parseClientMapping(v interface{}, identity string) (s *ClientMapping, err error) { func parsePullACLs(v interface{}) (m map[string]DatasetMapFilter, err error) {
t := struct { var asMap map[string]map[string]interface{}
Mapping map[string]string if err = mapstructure.Decode(v, &asMap); err != nil {
}{}
if err = mapstructure.Decode(v, &t); err != nil {
return
}
s = &ClientMapping{
From: identity,
}
s.Mapping, err = parseComboMapping(t.Mapping)
return return
} }
func parseComboMapping(m map[string]string) (c zfs.ComboMapping, err error) { m = make(map[string]DatasetMapFilter, len(asMap))
c.Mappings = make([]zfs.DatasetMapping, 0, len(m)) for identity, entry := range asMap {
parsePullACL := func() (filter DatasetMapFilter, err error) {
for lhs, rhs := range m { filterMap, ok := entry["filter"]
if !ok {
if lhs == "*" && strings.HasPrefix(rhs, "!") { err = fmt.Errorf("no filter specified")
m := zfs.ExecMapping{}
fields := strings.Fields(strings.TrimPrefix(rhs, "!"))
if len(fields) < 1 {
err = errors.New("ExecMapping without acceptor path")
return return
} }
m.Name = fields[0] filter, err = parseDatasetMapFilter(filterMap, true)
m.Args = fields[1:] return
}
c.Mappings = append(c.Mappings, m) filter, filterErr := parsePullACL()
if filterErr != nil {
} else if strings.HasSuffix(lhs, "*") { err = fmt.Errorf("cannot parse pull-ACL for identity '%s': %s", identity, filterErr)
return
m := zfs.GlobMapping{} }
m[identity] = filter
m.PrefixPath, err = zfs.NewDatasetPath(strings.TrimSuffix(lhs, "*")) }
if err != nil {
return return
} }
if m.TargetRoot, err = zfs.NewDatasetPath(rhs); err != nil { func parseDatasetMapFilter(mi interface{}, filterOnly bool) (f DatasetMapFilter, err error) {
var m map[string]string
if err = mapstructure.Decode(mi, &m); err != nil {
err = fmt.Errorf("maps / filters must be specified as map[string]string: %s", err)
return return
} }
c.Mappings = append(c.Mappings, m) f = NewDatasetMapFilter(len(m), filterOnly)
for pathPattern, mapping := range m {
} else { if err = f.Add(pathPattern, mapping); err != nil {
err = fmt.Errorf("invalid mapping entry ['%s':'%s']: %s", pathPattern, mapping, err)
m := zfs.DirectMapping{}
if lhs == "|" {
m.Source = nil
} else {
if m.Source, err = zfs.NewDatasetPath(lhs); err != nil {
return return
} }
} }
if m.Target, err = zfs.NewDatasetPath(rhs); err != nil {
return return
} }
c.Mappings = append(c.Mappings, m)
}
}
return
}
func (t SSHTransport) Connect(rpcLog Logger) (r rpc.RPCRequester, err error) { func (t SSHTransport) Connect(rpcLog Logger) (r rpc.RPCRequester, err error) {
var stream io.ReadWriteCloser var stream io.ReadWriteCloser
var rpcTransport sshbytestream.SSHTransport var rpcTransport sshbytestream.SSHTransport
@ -551,7 +527,7 @@ func parsePrune(e map[string]interface{}, name string) (prune *Prune, err error)
prune.RetentionPolicy = NewRetentionGrid(intervals) prune.RetentionPolicy = NewRetentionGrid(intervals)
// Parse filters // Parse filters
if prune.DatasetFilter, err = parseComboMapping(i.DatasetFilter); err != nil { if prune.DatasetFilter, err = parseDatasetMapFilter(i.DatasetFilter, true); err != nil {
err = fmt.Errorf("cannot parse dataset filter: %s", err) err = fmt.Errorf("cannot parse dataset filter: %s", err)
return return
} }
@ -746,7 +722,7 @@ func parseAutosnap(m interface{}, name string) (a *Autosnap, err error) {
err = fmt.Errorf("dataset_filter not specified") err = fmt.Errorf("dataset_filter not specified")
return return
} }
if a.DatasetFilter, err = parseComboMapping(i.DatasetFilter); err != nil { if a.DatasetFilter, err = parseDatasetMapFilter(i.DatasetFilter, true); err != nil {
err = fmt.Errorf("cannot parse dataset filter: %s", err) err = fmt.Errorf("cannot parse dataset filter: %s", err)
} }

164
cmd/config_mapfilter.go Normal file
View File

@ -0,0 +1,164 @@
package cmd
import (
"errors"
"fmt"
"strings"
"github.com/zrepl/zrepl/zfs"
)
type DatasetMapFilter struct {
entries []datasetMapFilterEntry
// if set, only valid filter entries can be added using Add()
// and Map() will always return an error
filterOnly bool
}
type datasetMapFilterEntry struct {
path zfs.DatasetPath
// the mapping. since this datastructure acts as both mapping and filter
// we have to convert it to the desired rep dynamically
mapping string
subtreeMatch bool
}
var NoMatchError error = errors.New("no match found in mapping")
func NewDatasetMapFilter(capacity int, filterOnly bool) DatasetMapFilter {
return DatasetMapFilter{
entries: make([]datasetMapFilterEntry, 0, capacity),
}
}
func (m *DatasetMapFilter) Add(pathPattern, mapping string) (err error) {
if m.filterOnly {
if _, err = parseDatasetFilterResult(mapping); err != nil {
return
}
}
// assert path glob adheres to spec
const SUBTREE_PATTERN string = "<"
patternCount := strings.Count(pathPattern, SUBTREE_PATTERN)
switch {
case patternCount > 1:
case patternCount == 1 && !strings.HasSuffix(pathPattern, SUBTREE_PATTERN):
err = fmt.Errorf("pattern invalid: only one '<' at end of string allowed")
return
}
var path zfs.DatasetPath
pathStr := strings.TrimSuffix(pathPattern, SUBTREE_PATTERN)
path, err = zfs.NewDatasetPath(pathStr)
if err != nil {
err = fmt.Errorf("pattern is not a dataset path: %s", err)
return
}
entry := datasetMapFilterEntry{
path: path,
mapping: mapping,
subtreeMatch: patternCount > 0,
}
m.entries = append(m.entries, entry)
return
}
// find the most specific prefix mapping we have
//
// longer prefix wins over shorter prefix, direct wins over glob
func (m DatasetMapFilter) mostSpecificPrefixMapping(path zfs.DatasetPath) (idx int, found bool) {
lcp, lcp_entry_idx := -1, -1
direct_idx := -1
for e := range m.entries {
entry := m.entries[e]
ep := m.entries[e].path
lep := ep.Length()
switch {
case !entry.subtreeMatch && ep.Equal(path):
direct_idx = e
continue
case entry.subtreeMatch && path.HasPrefix(ep) && lep > lcp:
lcp = lep
lcp_entry_idx = e
default:
continue
}
}
if lcp_entry_idx >= 0 || direct_idx >= 0 {
found = true
switch {
case direct_idx >= 0:
idx = direct_idx
case lcp_entry_idx >= 0:
idx = lcp_entry_idx
}
}
return
}
func (m DatasetMapFilter) Map(source zfs.DatasetPath) (target zfs.DatasetPath, err error) {
if m.filterOnly {
err = fmt.Errorf("using a filter for mapping simply does not work")
return
}
mi, hasMapping := m.mostSpecificPrefixMapping(source)
if !hasMapping {
err = NoMatchError
return
}
me := m.entries[mi]
target, err = zfs.NewDatasetPath(me.mapping)
if err != nil {
err = fmt.Errorf("mapping target is not a dataset path: %s", err)
return
}
if m.entries[mi].subtreeMatch {
// strip common prefix
extendComps := source.Copy()
if me.path.Empty() {
// special case: trying to map the root => strip first component
extendComps.TrimNPrefixComps(1)
} else {
extendComps.TrimPrefix(me.path)
}
target.Extend(extendComps)
}
return
}
func (m DatasetMapFilter) Filter(p zfs.DatasetPath) (pass bool, err error) {
mi, hasMapping := m.mostSpecificPrefixMapping(p)
if !hasMapping {
pass = false
return
}
me := m.entries[mi]
pass, err = parseDatasetFilterResult(me.mapping)
return
}
// Parse a dataset filter result
func parseDatasetFilterResult(result string) (pass bool, err error) {
l := strings.ToLower(result)
switch strings.ToLower(l) {
case "ok":
pass = true
return
case "omit":
return
default:
err = fmt.Errorf("'%s' is not a valid filter result", result)
return
}
return
}

View File

@ -1,10 +1,12 @@
package cmd package cmd
import ( import (
"github.com/stretchr/testify/assert"
"github.com/zrepl/zrepl/util"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
) )
func TestSampleConfigFileIsParsedWithoutErrors(t *testing.T) { func TestSampleConfigFileIsParsedWithoutErrors(t *testing.T) {
@ -45,3 +47,77 @@ func TestParseRetentionGridStringParsing(t *testing.T) {
assert.EqualValues(t, util.RetentionGridKeepCountAll, intervals[0].KeepCount) assert.EqualValues(t, util.RetentionGridKeepCountAll, intervals[0].KeepCount)
} }
func TestDatasetMapFilter(t *testing.T) {
expectMapping := func(m map[string]string, from, to string) {
dmf, err := parseDatasetMapFilter(m, false)
if err != nil {
t.Logf("expect test map to be valid: %s", err)
t.FailNow()
}
fromPath, err := zfs.NewDatasetPath(from)
if err != nil {
t.Logf("expect test from path to be valid: %s", err)
t.FailNow()
}
toPath, err := zfs.NewDatasetPath(to)
if err != nil {
t.Logf("expect test to path to be valid: %s", err)
t.FailNow()
}
res, err := dmf.Map(fromPath)
t.Logf("%s => %s", fromPath.ToString(), res.ToString())
assert.Nil(t, err)
assert.True(t, res.Equal(toPath))
}
expectFilter := func(m map[string]string, path string, pass bool) {
dmf, err := parseDatasetMapFilter(m, true)
if err != nil {
t.Logf("expect test filter to be valid: %s", err)
t.FailNow()
}
p, err := zfs.NewDatasetPath(path)
if err != nil {
t.Logf("expect test path to be valid: %s", err)
t.FailNow()
}
res, err := dmf.Filter(p)
assert.Nil(t, err)
assert.Equal(t, pass, res)
}
map1 := map[string]string{
"a/b/c<": "root1",
"a/b<": "root2",
"<": "root3/b/c",
"q<": "root4/1/2",
}
expectMapping(map1, "a/b/c", "root1")
expectMapping(map1, "a/b/c/d", "root1/d")
expectMapping(map1, "a/b/e", "root2/e")
expectMapping(map1, "a/b", "root2")
expectMapping(map1, "x", "root3/b/c")
expectMapping(map1, "x/y", "root3/b/c/y")
expectMapping(map1, "q", "root4/1/2")
expectMapping(map1, "q/r", "root4/1/2/r")
filter1 := map[string]string{
"<": "omit",
"a<": "ok",
"a/b<": "omit",
}
expectFilter(filter1, "b", false)
expectFilter(filter1, "a", true)
expectFilter(filter1, "a/d", true)
expectFilter(filter1, "a/b", false)
expectFilter(filter1, "a/b/c", false)
filter2 := map[string]string{}
expectFilter(filter2, "foo", false) // default to omit
}

View File

@ -7,10 +7,14 @@ import (
"io" "io"
) )
type DatasetMapping interface {
Map(source zfs.DatasetPath) (target zfs.DatasetPath, err error)
}
type Handler struct { type Handler struct {
Logger Logger Logger Logger
PullACL zfs.DatasetMapping PullACL zfs.DatasetFilter
SinkMappingFunc func(clientIdentity string) (mapping zfs.DatasetMapping, err error) SinkMappingFunc func(clientIdentity string) (mapping DatasetMapping, err error)
} }
func (h Handler) HandleFilesystemRequest(r rpc.FilesystemRequest) (roots []zfs.DatasetPath, err error) { func (h Handler) HandleFilesystemRequest(r rpc.FilesystemRequest) (roots []zfs.DatasetPath, err error) {
@ -90,7 +94,7 @@ func (h Handler) HandlePullMeRequest(r rpc.PullMeRequest, clientIdentity string,
h.Logger.Printf("handling PullMeRequest: %#v", r) h.Logger.Printf("handling PullMeRequest: %#v", r)
var sinkMapping zfs.DatasetMapping var sinkMapping DatasetMapping
sinkMapping, err = h.SinkMappingFunc(clientIdentity) sinkMapping, err = h.SinkMappingFunc(clientIdentity)
if err != nil { if err != nil {
h.Logger.Printf("no sink mapping for client identity '%s', denying PullMeRequest", clientIdentity) h.Logger.Printf("no sink mapping for client identity '%s', denying PullMeRequest", clientIdentity)

View File

@ -141,12 +141,24 @@ func cmdRun(cmd *cobra.Command, args []string) {
} }
type localPullACL struct{}
func (a localPullACL) Filter(p zfs.DatasetPath) (pass bool, err error) {
return true, nil
}
func jobPull(pull *Pull, log jobrun.Logger) (err error) { func jobPull(pull *Pull, log jobrun.Logger) (err error) {
if lt, ok := pull.From.Transport.(LocalTransport); ok { if lt, ok := pull.From.Transport.(LocalTransport); ok {
lt.SetHandler(Handler{ lt.SetHandler(Handler{
Logger: log, Logger: log,
PullACL: pull.Mapping, // Allow access to any dataset since we control what mapping
// is passed to the pull routine.
// All local datasets will be passed to its Map() function,
// but only those for which a mapping exists will actually be pulled.
// We can pay this small performance penalty for now.
PullACL: localPullACL{},
}) })
pull.From.Transport = lt pull.From.Transport = lt
log.Printf("fixing up local transport: %#v", pull.From.Transport) log.Printf("fixing up local transport: %#v", pull.From.Transport)
@ -228,7 +240,7 @@ func closeRPCWithTimeout(log Logger, remote rpc.RPCRequester, timeout time.Durat
type PullContext struct { type PullContext struct {
Remote rpc.RPCRequester Remote rpc.RPCRequester
Log Logger Log Logger
Mapping zfs.DatasetMapping Mapping DatasetMapping
InitialReplPolicy rpc.InitialReplPolicy InitialReplPolicy rpc.InitialReplPolicy
} }
@ -258,7 +270,7 @@ func doPull(pull PullContext) (err error) {
var localFs zfs.DatasetPath var localFs zfs.DatasetPath
localFs, err = pull.Mapping.Map(remoteFilesystems[fs]) localFs, err = pull.Mapping.Map(remoteFilesystems[fs])
if err != nil { if err != nil {
if err != zfs.NoMatchError { if err != NoMatchError {
log.Printf("error mapping %s: %#v\n", remoteFilesystems[fs], err) log.Printf("error mapping %s: %#v\n", remoteFilesystems[fs], err)
return err return err
} }

View File

@ -12,8 +12,9 @@ pushs:
offsite: offsite:
to: offsite_backups to: offsite_backups
filter: { filter: {
"tank/var/db*":"ok", # like in pull_acls
"tank/usr/home*":"ok" "tank/var/db<": ok,
"tank/usr/home<": ok,
} }
pulls: pulls:
@ -36,59 +37,39 @@ pulls:
sinks: sinks:
# direct mapping
# 1:1 mapping of remote dataset to local dataset
# We will reject a push request which contains > 0 datasets that do not
# match a mapping
db1: db1:
mapping: { mapping: {
# direct mapping
"ssdpool/var/db/postgresql9.6":"zroot/backups/db1/pg_data" "ssdpool/var/db/postgresql9.6":"zroot/backups/db1/pg_data"
} }
# "|" non-recursive wildcard
# the remote must present excatly one dataset, mapped to the rhs
cdn_master:
mapping: {
"|":"tank/srv/cdn" # NOTE: | is currently an invalid character for a ZFS dataset
}
# "*" recursive wildcard
# the remote may present an arbitrary set of marks a recursive wildcard, i.e. map all remotes to a tree under rhs
mirror1: mirror1:
mapping: { mapping: {
"tank/foo/bar*":"zroot/backups/mirror1" # NOTE: * is currently an invalid character for a ZFS dataset # "<" subtree wildcard matches the dataset left of < and all its children
"tank/foo/bar<":"zroot/backups/mirror1"
} }
# "*":"!..." acceptor script mirror2:
# shell out to an accceptor that receives the remote's offered datasets # more specific path patterns win over less specific ones
# on stdin and, foreach line of this input, returns the corresponding # direct mappings win over subtree wildcards
# local dataset (same order) or '!<space>optional reason' on stdout # detailed rule precedence: check unit tests & docs for exact behavior
# If the acceptor scripts exits with non-zero status code, the remote's # TODO subcommand to test a mapping & filter
# request will be rejected mapping: {
complex_host: "tank<": "zroot/backups/mirror1/tank1",
mapping: { # "tank/cdn/root<": "storage/cdn/root",
"*":"!/path/to/acceptor" # we could just wire the path to the acceptor directly to the mapping "tank/legacydb": "legacypool/backups/legacydb",
# but let's stick with the same type for the mapping field for now'
# NOTE: * and ! are currently invalid characters for a ZFS dataset
} }
# Mixing the rules
# Mixing should be possible if there is a defined precedence (direct before *)
# and non-recursive wildcards are not allowed in multi-entry mapping objects
special_snowflake:
mapping: { # an explicit mapping mixed with a recursive wildcard
"sun/usr/home": backups/special_snowflake/homedirs,
"sun/var/db": backups/special_snowflake/database,
"*": backups/special_snowflake/remainingbackup
# NOTE: ^ alignment, should be possible, looks nicer
}
pull_acls: pull_acls:
# same synatx as in sinks, but the returned mapping does not matter
office_backup: office_backup:
mapping: { filter: {
"tank/usr/home":"notnull" # valid filter results (right hand side): ok, omit
# default is to omit
# rule precedence is same as for mappings
"tank<": omit,
"tank/usr/home": ok,
} }
@ -98,7 +79,8 @@ prune:
policy: grid policy: grid
grid: 6x10m | 24x1h | 7x1d | 5 x 1w | 4 x 5w grid: 6x10m | 24x1h | 7x1d | 5 x 1w | 4 x 5w
dataset_filter: { dataset_filter: {
"tank/backups/*": ok "tank/backups/legacyscript<": omit,
"tank/backups<": ok,
} }
snapshot_filter: { snapshot_filter: {
prefix: zrepl_ prefix: zrepl_
@ -108,7 +90,7 @@ prune:
policy: grid policy: grid
grid: 1x1m(keep=all) grid: 1x1m(keep=all)
dataset_filter: { dataset_filter: {
"pool1*": ok "pool1<": ok
} }
snapshot_filter: { snapshot_filter: {
prefix: zrepl_hfbak_ prefix: zrepl_hfbak_
@ -120,7 +102,7 @@ autosnap:
prefix: zrepl_hfbak_ prefix: zrepl_hfbak_
interval: 1s interval: 1s
dataset_filter: { dataset_filter: {
"pool1*": ok "pool1<": ok
} }
# prune: hfbak_prune # prune: hfbak_prune
# future versions may inline the retention policy here, but for now, # future versions may inline the retention policy here, but for now,

View File

@ -5,7 +5,6 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/sshbytestream" "github.com/zrepl/zrepl/sshbytestream"
"github.com/zrepl/zrepl/zfs"
"io" "io"
golog "log" golog "log"
"os" "os"
@ -37,8 +36,8 @@ func cmdStdinServer(cmd *cobra.Command, args []string) {
} }
identity := args[0] identity := args[0]
pullACL := conf.PullACLs[identity] pullACL, ok := conf.PullACLs[identity]
if pullACL == nil { if !ok {
err = fmt.Errorf("could not find PullACL for identity '%s'", identity) err = fmt.Errorf("could not find PullACL for identity '%s'", identity)
return return
} }
@ -48,19 +47,19 @@ func cmdStdinServer(cmd *cobra.Command, args []string) {
return return
} }
sinkMapping := func(identity string) (m zfs.DatasetMapping, err error) { sinkMapping := func(identity string) (m DatasetMapping, err error) {
sink := conf.Sinks[identity] sink, ok := conf.Sinks[identity]
if sink == nil { if !ok {
return nil, fmt.Errorf("could not find sink for dataset") return nil, fmt.Errorf("could not find sink for identity '%s'", identity)
} }
return sink.Mapping, nil return sink, nil
} }
sinkLogger := golog.New(logOut, fmt.Sprintf("sink[%s] ", identity), logFlags) sinkLogger := golog.New(logOut, fmt.Sprintf("sink[%s] ", identity), logFlags)
handler := Handler{ handler := Handler{
Logger: sinkLogger, Logger: sinkLogger,
SinkMappingFunc: sinkMapping, SinkMappingFunc: sinkMapping,
PullACL: pullACL.Mapping, PullACL: pullACL,
} }
if err = rpc.ListenByteStreamRPC(sshByteStream, identity, handler, sinkLogger); err != nil { if err = rpc.ListenByteStreamRPC(sshByteStream, identity, handler, sinkLogger); err != nil {

View File

@ -11,20 +11,20 @@ func NewDatasetPathForest() *DatasetPathForest {
} }
func (f *DatasetPathForest) Add(p DatasetPath) { func (f *DatasetPathForest) Add(p DatasetPath) {
if len(p) <= 0 { if len(p.comps) <= 0 {
panic("dataset path too short. must have length > 0") panic("dataset path too short. must have length > 0")
} }
// Find its root // Find its root
var root *datasetPathTree var root *datasetPathTree
for _, r := range f.roots { for _, r := range f.roots {
if r.Add(p) { if r.Add(p.comps) {
root = r root = r
break break
} }
} }
if root == nil { if root == nil {
root = newDatasetPathTree(p) root = newDatasetPathTree(p.comps)
f.roots = append(f.roots, root) f.roots = append(f.roots, root)
} }
} }
@ -57,7 +57,7 @@ type datasetPathTree struct {
Children []*datasetPathTree Children []*datasetPathTree
} }
func (t *datasetPathTree) Add(p DatasetPath) bool { func (t *datasetPathTree) Add(p []string) bool {
if len(p) == 0 { if len(p) == 0 {
return true return true
@ -88,11 +88,15 @@ func (t *datasetPathTree) Add(p DatasetPath) bool {
} }
func (t *datasetPathTree) WalkTopDown(parent DatasetPath, visitor DatasetPathsVisitor) { func (t *datasetPathTree) WalkTopDown(parent []string, visitor DatasetPathsVisitor) {
this := append(parent, t.Component) this := append(parent, t.Component)
visitChildTree := visitor(DatasetPathVisit{this, t.FilledIn}) thisVisit := DatasetPathVisit{
DatasetPath{this},
t.FilledIn,
}
visitChildTree := visitor(thisVisit)
if visitChildTree { if visitChildTree {
for _, c := range t.Children { for _, c := range t.Children {
@ -102,15 +106,15 @@ func (t *datasetPathTree) WalkTopDown(parent DatasetPath, visitor DatasetPathsVi
} }
func newDatasetPathTree(initial DatasetPath) (t *datasetPathTree) { func newDatasetPathTree(initialComps []string) (t *datasetPathTree) {
t = &datasetPathTree{} t = &datasetPathTree{}
var cur *datasetPathTree var cur *datasetPathTree
cur = t cur = t
for i, comp := range initial { for i, comp := range initialComps {
cur.Component = comp cur.Component = comp
cur.FilledIn = true cur.FilledIn = true
cur.Children = make([]*datasetPathTree, 0, 1) cur.Children = make([]*datasetPathTree, 0, 1)
if i == len(initial)-1 { if i == len(initialComps)-1 {
cur.FilledIn = false // last component is not filled in cur.FilledIn = false // last component is not filled in
break break
} }

View File

@ -7,7 +7,7 @@ import (
func TestNewDatasetPathTree(t *testing.T) { func TestNewDatasetPathTree(t *testing.T) {
r := newDatasetPathTree(toDatasetPath("pool1/foo/bar")) r := newDatasetPathTree(toDatasetPath("pool1/foo/bar").comps)
assert.Equal(t, "pool1", r.Component) assert.Equal(t, "pool1", r.Component)
assert.True(t, len(r.Children) == 1) assert.True(t, len(r.Children) == 1)

View File

@ -1,21 +1,15 @@
package zfs package zfs
import ( import "fmt"
"bufio"
"errors"
"fmt"
"io"
"os/exec"
)
type DatasetMapping interface { type DatasetFilter interface {
Map(source DatasetPath) (target DatasetPath, err error) Filter(p DatasetPath) (pass bool, err error)
} }
func ZFSListMapping(mapping DatasetMapping) (datasets []DatasetPath, err error) { func ZFSListMapping(filter DatasetFilter) (datasets []DatasetPath, err error) {
if mapping == nil { if filter == nil {
panic("mapping must not be nil") panic("filter must not be nil")
} }
var lines [][]string var lines [][]string
@ -30,12 +24,11 @@ func ZFSListMapping(mapping DatasetMapping) (datasets []DatasetPath, err error)
return return
} }
_, mapErr := mapping.Map(path) pass, filterErr := filter.Filter(path)
if mapErr != nil && mapErr != NoMatchError { if filterErr != nil {
return nil, mapErr return nil, fmt.Errorf("error calling filter: %s", filterErr)
} }
if pass {
if mapErr == nil {
datasets = append(datasets, path) datasets = append(datasets, path)
} }
@ -43,139 +36,3 @@ func ZFSListMapping(mapping DatasetMapping) (datasets []DatasetPath, err error)
return return
} }
type GlobMapping struct {
PrefixPath DatasetPath
TargetRoot DatasetPath
}
var NoMatchError error = errors.New("no match found in mapping")
func (m GlobMapping) Map(source DatasetPath) (target DatasetPath, err error) {
if len(source) < len(m.PrefixPath) {
err = NoMatchError
return
}
target = make([]string, 0, len(source)+len(m.TargetRoot))
target = append(target, m.TargetRoot...)
for si, sc := range source {
target = append(target, sc)
if si < len(m.PrefixPath) {
compsMatch := sc == m.PrefixPath[si]
endOfPrefixPath := si == len(m.PrefixPath)-1 && m.PrefixPath[si] == ""
if !(compsMatch || endOfPrefixPath) {
err = NoMatchError
return
}
continue
}
}
return
}
type ComboMapping struct {
Mappings []DatasetMapping
}
func (m ComboMapping) Map(source DatasetPath) (target DatasetPath, err error) {
for _, sm := range m.Mappings {
target, err = sm.Map(source)
if err == nil {
return target, err
}
}
return nil, NoMatchError
}
type DirectMapping struct {
Source DatasetPath
Target DatasetPath
}
func (m DirectMapping) Map(source DatasetPath) (target DatasetPath, err error) {
if m.Source == nil {
return m.Target, nil
}
if len(m.Source) != len(source) {
return nil, NoMatchError
}
for i, c := range source {
if c != m.Source[i] {
return nil, NoMatchError
}
}
return m.Target, nil
}
type ExecMapping struct {
Name string
Args []string
}
func NewExecMapping(name string, args ...string) (m *ExecMapping) {
m = &ExecMapping{
Name: name,
Args: args,
}
return
}
func (m ExecMapping) Map(source DatasetPath) (target DatasetPath, err error) {
var stdin io.Writer
var stdout io.Reader
cmd := exec.Command(m.Name, m.Args...)
if stdin, err = cmd.StdinPipe(); err != nil {
return
}
if stdout, err = cmd.StdoutPipe(); err != nil {
return
}
resp := bufio.NewScanner(stdout)
if err = cmd.Start(); err != nil {
return
}
go func() {
err := cmd.Wait()
if err != nil {
panic(err)
// fmt.Printf("error: %v\n", err) // TODO
}
}()
if _, err = io.WriteString(stdin, source.ToString()+"\n"); err != nil {
return
}
if !resp.Scan() {
err = errors.New(fmt.Sprintf("unexpected end of file: %v", resp.Err()))
return
}
t := resp.Text()
switch {
case t == "NOMAP":
return nil, NoMatchError
}
target = toDatasetPath(t) // TODO discover garbage?
return
}

View File

@ -1,134 +0,0 @@
package zfs
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestGlobMappingPrefixWildcard(t *testing.T) {
m := GlobMapping{
PrefixPath: toDatasetPath("a/b/c/"), // TRAILING empty component!
TargetRoot: toDatasetPath("x/y"),
}
t.Logf("PrefixPath: %#v", m.PrefixPath)
var r DatasetPath
var err error
r, err = m.Map(toDatasetPath("a/b/c"))
assert.NotNil(t, err)
r, err = m.Map(toDatasetPath("a/b/c/d"))
assert.Nil(t, err)
assert.Equal(t, toDatasetPath("x/y/a/b/c/d"), r)
}
func TestGlobMapping(t *testing.T) {
m := GlobMapping{
PrefixPath: toDatasetPath("tank/usr/home"),
TargetRoot: toDatasetPath("backups/share1"),
}
var r DatasetPath
var err error
r, err = m.Map(toDatasetPath("tank/usr/home"))
assert.Nil(t, err)
assert.Equal(t, toDatasetPath("backups/share1/tank/usr/home"), r)
r, err = m.Map(toDatasetPath("zroot"))
assert.Equal(t, NoMatchError, err, "prefix-only match is an error")
r, err = m.Map(toDatasetPath("zroot/notmapped"))
assert.Equal(t, NoMatchError, err, "non-prefix is an error")
}
func TestGlobMappingWildcard(t *testing.T) {
m := GlobMapping{
PrefixPath: EmptyDatasetPath,
TargetRoot: toDatasetPath("backups/share1"),
}
var r DatasetPath
var err error
r, err = m.Map(toDatasetPath("tank/usr/home"))
assert.Equal(t, toDatasetPath("backups/share1/tank/usr/home"), r)
assert.NoError(t, err)
}
func TestComboMapping(t *testing.T) {
m1 := GlobMapping{
PrefixPath: toDatasetPath("a/b"),
TargetRoot: toDatasetPath("c/d"),
}
m2 := GlobMapping{
PrefixPath: toDatasetPath("a/x"),
TargetRoot: toDatasetPath("c/y"),
}
c := ComboMapping{
Mappings: []DatasetMapping{m1, m2},
}
var r DatasetPath
var err error
p := toDatasetPath("a/b/q")
r, err = m2.Map(p)
assert.Equal(t, NoMatchError, err)
r, err = c.Map(p)
assert.Nil(t, err)
assert.Equal(t, toDatasetPath("c/d/a/b/q"), r)
}
func TestDirectMapping(t *testing.T) {
m := DirectMapping{
Source: toDatasetPath("a/b/c"),
Target: toDatasetPath("x/y/z"),
}
var r DatasetPath
var err error
r, err = m.Map(toDatasetPath("a/b/c"))
assert.Nil(t, err)
assert.Equal(t, m.Target, r)
r, err = m.Map(toDatasetPath("not/matching"))
assert.Equal(t, NoMatchError, err)
r, err = m.Map(toDatasetPath("a/b"))
assert.Equal(t, NoMatchError, err)
}
func TestExecMapping(t *testing.T) {
var err error
var m DatasetMapping
m = NewExecMapping("test_helpers/exec_mapping_good.sh", "nostop")
assert.NoError(t, err)
var p DatasetPath
p, err = m.Map(toDatasetPath("nomap/foobar"))
assert.Equal(t, NoMatchError, err)
p, err = m.Map(toDatasetPath("willmap/something"))
assert.Nil(t, err)
assert.Equal(t, toDatasetPath("didmap/willmap/something"), p)
}

View File

@ -11,27 +11,97 @@ import (
"strings" "strings"
) )
type DatasetPath []string type DatasetPath struct {
comps []string
}
func (p DatasetPath) ToString() string { func (p DatasetPath) ToString() string {
return strings.Join(p, "/") return strings.Join(p.comps, "/")
} }
func (p DatasetPath) Empty() bool { func (p DatasetPath) Empty() bool {
return len(p) == 0 return len(p.comps) == 0
} }
var EmptyDatasetPath DatasetPath = []string{} func (p *DatasetPath) Extend(extend DatasetPath) {
p.comps = append(p.comps, extend.comps...)
}
func (p DatasetPath) HasPrefix(prefix DatasetPath) bool {
if len(prefix.comps) > len(p.comps) {
return false
}
for i := range prefix.comps {
if prefix.comps[i] != p.comps[i] {
return false
}
}
return true
}
func (p *DatasetPath) TrimPrefix(prefix DatasetPath) {
if !p.HasPrefix(prefix) {
return
}
prelen := len(prefix.comps)
newlen := len(p.comps) - prelen
oldcomps := p.comps
p.comps = make([]string, newlen)
for i := 0; i < newlen; i++ {
p.comps[i] = oldcomps[prelen+i]
}
return
}
func (p *DatasetPath) TrimNPrefixComps(n int) {
if len(p.comps) < n {
n = len(p.comps)
}
if n == 0 {
return
}
p.comps = p.comps[n:]
}
func (p DatasetPath) Equal(q DatasetPath) bool {
if len(p.comps) != len(q.comps) {
return false
}
for i := range p.comps {
if p.comps[i] != q.comps[i] {
return false
}
}
return true
}
func (p DatasetPath) Length() int {
return len(p.comps)
}
func (p DatasetPath) Copy() (c DatasetPath) {
c.comps = make([]string, len(p.comps))
copy(c.comps, p.comps)
return
}
func NewDatasetPath(s string) (p DatasetPath, err error) { func NewDatasetPath(s string) (p DatasetPath, err error) {
if s == "" { if s == "" {
return EmptyDatasetPath, nil // the empty dataset path p.comps = make([]string, 0)
return p, nil // the empty dataset path
} }
const FORBIDDEN = "@#|\t " const FORBIDDEN = "@#|\t <>*"
if strings.ContainsAny(s, FORBIDDEN) { // TODO space may be a bit too restrictive... if strings.ContainsAny(s, FORBIDDEN) { // TODO space may be a bit too restrictive...
return nil, errors.New(fmt.Sprintf("path '%s' contains forbidden characters (any of '%s')", s, FORBIDDEN)) err = fmt.Errorf("contains forbidden characters (any of '%s')", FORBIDDEN)
return
} }
return strings.Split(s, "/"), nil p.comps = strings.Split(s, "/")
if p.comps[len(p.comps)-1] == "" {
err = fmt.Errorf("must not end with a '/'")
return
}
return
} }
func toDatasetPath(s string) DatasetPath { func toDatasetPath(s string) DatasetPath {
@ -42,8 +112,6 @@ func toDatasetPath(s string) DatasetPath {
return p return p
} }
type DatasetFilter func(path DatasetPath) bool
type ZFSError struct { type ZFSError struct {
Stderr []byte Stderr []byte
WaitErr error WaitErr error

View File

@ -17,3 +17,14 @@ func TestZFSListHandlesProducesZFSErrorOnNonZeroExit(t *testing.T) {
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, "error: this is a mock\n", string(zfsError.Stderr)) assert.Equal(t, "error: this is a mock\n", string(zfsError.Stderr))
} }
func TestDatasetPathTrimNPrefixComps(t *testing.T) {
p, err := NewDatasetPath("foo/bar/a/b")
assert.Nil(t, err)
p.TrimNPrefixComps(2)
assert.True(t, p.Equal(toDatasetPath("a/b")))
p.TrimNPrefixComps((2))
assert.True(t, p.Empty())
p.TrimNPrefixComps((1))
assert.True(t, p.Empty(), "empty trimming shouldn't do harm")
}