zrepl/cmd/config.go

362 lines
6.9 KiB
Go
Raw Normal View History

2017-04-26 18:36:01 +02:00
package main
import (
2017-04-26 20:25:53 +02:00
"errors"
"fmt"
"github.com/jinzhu/copier"
2017-04-26 18:36:01 +02:00
"github.com/mitchellh/mapstructure"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/sshbytestream"
2017-04-26 18:36:01 +02:00
"github.com/zrepl/zrepl/zfs"
2017-04-26 20:25:53 +02:00
yaml "gopkg.in/yaml.v2"
"io"
2017-04-26 20:25:53 +02:00
"io/ioutil"
2017-04-26 18:36:01 +02:00
"strings"
)
const LOCAL_TRANSPORT_IDENTITY string = "local"
2017-04-26 18:36:01 +02:00
type Pool struct {
Name string
Transport Transport
2017-04-26 18:36:01 +02:00
}
type Transport interface {
Connect() (rpc.RPCRequester, error)
}
type LocalTransport struct {
Handler rpc.RPCHandler
}
type SSHTransport struct {
Host string
User string
Port uint16
IdentityFile string `mapstructure:"identity_file"`
TransportOpenCommand []string `mapstructure:"transport_open_command"`
SSHCommand string `mapstructure:"ssh_command"`
Options []string
}
2017-04-26 18:36:01 +02:00
type Push struct {
To *Pool
2017-04-26 18:36:01 +02:00
Datasets []zfs.DatasetPath
}
type Pull struct {
From *Pool
2017-04-26 20:25:53 +02:00
Mapping zfs.DatasetMapping
2017-04-26 18:36:01 +02:00
}
type ClientMapping struct {
2017-04-26 20:25:53 +02:00
From string
Mapping zfs.DatasetMapping
2017-04-26 18:36:01 +02:00
}
type Config struct {
Pools []Pool
Pushs []Push
Pulls []Pull
Sinks []ClientMapping
PullACLs []ClientMapping
2017-04-26 18:36:01 +02:00
}
func ParseConfig(path string) (config Config, err error) {
c := make(map[string]interface{}, 0)
var bytes []byte
if bytes, err = ioutil.ReadFile(path); err != nil {
return
}
if err = yaml.Unmarshal(bytes, &c); err != nil {
return
}
return parseMain(c)
}
func parseMain(root map[string]interface{}) (c Config, err error) {
if c.Pools, err = parsePools(root["pools"]); err != nil {
return
}
poolLookup := func(name string) (*Pool, error) {
for _, pool := range c.Pools {
if pool.Name == name {
return &pool, nil
}
}
return nil, errors.New(fmt.Sprintf("pool '%s' not defined", name))
}
if c.Pushs, err = parsePushs(root["pushs"], poolLookup); err != nil {
2017-04-26 18:36:01 +02:00
return
}
if c.Pulls, err = parsePulls(root["pulls"], poolLookup); err != nil {
2017-04-26 18:36:01 +02:00
return
}
if c.Sinks, err = parseClientMappings(root["sinks"]); err != nil {
return
}
if c.PullACLs, err = parseClientMappings(root["pull_acls"]); err != nil {
2017-04-26 18:36:01 +02:00
return
}
return
}
func parsePools(v interface{}) (pools []Pool, err error) {
asList := make([]struct {
Name string
Transport map[string]interface{}
}, 0)
if err = mapstructure.Decode(v, &asList); err != nil {
return
}
pools = make([]Pool, len(asList))
for i, p := range asList {
if p.Name == LOCAL_TRANSPORT_IDENTITY {
err = errors.New(fmt.Sprintf("pool name '%s' reserved for local pulls", LOCAL_TRANSPORT_IDENTITY))
return
}
var transport Transport
if transport, err = parseTransport(p.Transport); err != nil {
return
}
pools[i] = Pool{
Name: p.Name,
Transport: transport,
}
}
2017-04-26 18:36:01 +02:00
return
}
func parseTransport(it map[string]interface{}) (t Transport, err error) {
if len(it) != 1 {
err = errors.New("ambiguous transport type")
return
}
for key, val := range it {
switch key {
case "ssh":
t := SSHTransport{}
if err = mapstructure.Decode(val, &t); err != nil {
err = errors.New(fmt.Sprintf("could not parse ssh transport: %s", err))
return nil, err
}
return t, nil
default:
return nil, errors.New(fmt.Sprintf("unknown transport type '%s'\n", key))
}
}
return // unreachable
}
type poolLookup func(name string) (*Pool, error)
func parsePushs(v interface{}, pl poolLookup) (p []Push, err error) {
2017-04-26 18:36:01 +02:00
2017-04-26 20:25:53 +02:00
asList := make([]struct {
To string
2017-04-26 18:36:01 +02:00
Datasets []string
}, 0)
if err = mapstructure.Decode(v, &asList); err != nil {
return
}
p = make([]Push, len(asList))
for i, e := range asList {
var toPool *Pool
if toPool, err = pl(e.To); err != nil {
return
}
2017-04-26 18:36:01 +02:00
push := Push{
To: toPool,
2017-04-26 18:36:01 +02:00
Datasets: make([]zfs.DatasetPath, len(e.Datasets)),
}
for i, ds := range e.Datasets {
if push.Datasets[i], err = zfs.NewDatasetPath(ds); err != nil {
return
}
}
p[i] = push
2017-04-26 18:36:01 +02:00
}
return
}
func parsePulls(v interface{}, pl poolLookup) (p []Pull, err error) {
2017-04-26 18:36:01 +02:00
2017-04-26 20:25:53 +02:00
asList := make([]struct {
From string
2017-04-26 18:36:01 +02:00
Mapping map[string]string
}, 0)
if err = mapstructure.Decode(v, &asList); err != nil {
return
}
p = make([]Pull, len(asList))
for i, e := range asList {
var fromPool *Pool
if e.From == LOCAL_TRANSPORT_IDENTITY {
fromPool = &Pool{
Name: "local",
Transport: LocalTransport{},
}
} else {
if fromPool, err = pl(e.From); err != nil {
return
}
}
2017-04-30 17:56:11 +02:00
2017-04-26 18:36:01 +02:00
pull := Pull{
From: fromPool,
2017-04-26 18:36:01 +02:00
}
if pull.Mapping, err = parseComboMapping(e.Mapping); err != nil {
return
}
p[i] = pull
2017-04-26 18:36:01 +02:00
}
return
}
func expectList(v interface{}) (asList []interface{}, err error) {
2017-04-26 18:36:01 +02:00
var ok bool
2017-04-26 20:25:53 +02:00
if asList, ok = v.([]interface{}); !ok {
err = errors.New("expected list")
}
return
}
func parseClientMappings(v interface{}) (cm []ClientMapping, err error) {
var asList []interface{}
if asList, err = expectList(v); err != nil {
return
2017-04-26 18:36:01 +02:00
}
cm = make([]ClientMapping, len(asList))
2017-04-26 18:36:01 +02:00
for i, e := range asList {
var m ClientMapping
if m, err = parseClientMapping(e); err != nil {
2017-04-26 18:36:01 +02:00
return
}
cm[i] = m
2017-04-26 18:36:01 +02:00
}
return
}
func parseClientMapping(v interface{}) (s ClientMapping, err error) {
2017-04-26 18:36:01 +02:00
t := struct {
2017-04-26 20:25:53 +02:00
From string
2017-04-26 18:36:01 +02:00
Mapping map[string]string
}{}
if err = mapstructure.Decode(v, &t); err != nil {
return
}
s.From = t.From
s.Mapping, err = parseComboMapping(t.Mapping)
return
}
func parseComboMapping(m map[string]string) (c zfs.ComboMapping, err error) {
2017-04-30 17:56:11 +02:00
c.Mappings = make([]zfs.DatasetMapping, 0, len(m))
2017-04-26 18:36:01 +02:00
2017-04-26 20:25:53 +02:00
for lhs, rhs := range m {
2017-04-26 18:36:01 +02:00
2017-04-30 17:56:11 +02:00
if lhs == "|" {
2017-04-26 18:36:01 +02:00
if len(m) != 1 {
err = errors.New("non-recursive mapping must be the only mapping for a sink")
}
m := zfs.DirectMapping{
2017-04-26 20:25:53 +02:00
Source: nil,
2017-04-26 18:36:01 +02:00
}
if m.Target, err = zfs.NewDatasetPath(rhs); err != nil {
return
}
c.Mappings = append(c.Mappings, m)
2017-04-30 17:56:11 +02:00
} else if lhs == "*" && strings.HasPrefix(rhs, "!") {
2017-04-26 18:36:01 +02:00
m := zfs.ExecMapping{}
fields := strings.Fields(strings.TrimPrefix(rhs, "!"))
if len(fields) < 1 {
err = errors.New("ExecMapping without acceptor path")
return
}
m.Name = fields[0]
m.Args = fields[1:]
c.Mappings = append(c.Mappings, m)
} else if strings.HasSuffix(lhs, "*") {
m := zfs.GlobMapping{}
m.PrefixPath, err = zfs.NewDatasetPath(strings.TrimSuffix(lhs, "*"))
if err != nil {
return
}
if m.TargetRoot, err = zfs.NewDatasetPath(rhs); err != nil {
return
}
c.Mappings = append(c.Mappings, m)
}
}
return
}
func (t SSHTransport) Connect() (r rpc.RPCRequester, err error) {
var stream io.ReadWriteCloser
var rpcTransport sshbytestream.SSHTransport
if err = copier.Copy(&rpcTransport, t); err != nil {
return
}
if stream, err = sshbytestream.Outgoing(rpcTransport); err != nil {
return
}
return rpc.ConnectByteStreamRPC(stream)
}
func (t LocalTransport) Connect() (r rpc.RPCRequester, err error) {
if t.Handler == nil {
panic("local transport with uninitialized handler")
}
return rpc.ConnectLocalRPC(t.Handler), nil
}
func (t *LocalTransport) SetHandler(handler rpc.RPCHandler) {
t.Handler = handler
}