config: restructure in 'jobs' and 'global' section

This commit is contained in:
Christian Schwarz 2017-09-11 13:43:18 +02:00
parent fa4d2098a8
commit f3689563b5
7 changed files with 108 additions and 60 deletions

View File

@ -8,9 +8,18 @@ import (
) )
type Config struct { type Config struct {
Global Global
Jobs map[string]Job Jobs map[string]Job
} }
type Global struct {
Serve struct {
Stdinserver struct {
SockDir string
}
}
}
type RPCConnecter interface { type RPCConnecter interface {
Connect() (rpc.RPCClient, error) Connect() (rpc.RPCClient, error)
} }

View File

@ -3,10 +3,10 @@ package cmd
import ( import (
"io/ioutil" "io/ioutil"
"fmt"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
"github.com/pkg/errors" "github.com/pkg/errors"
yaml "gopkg.in/yaml.v2" yaml "gopkg.in/yaml.v2"
"fmt"
) )
func ParseConfig(path string) (config *Config, err error) { func ParseConfig(path string) (config *Config, err error) {
@ -30,43 +30,76 @@ func ParseConfig(path string) (config *Config, err error) {
func parseConfig(i interface{}) (c *Config, err error) { func parseConfig(i interface{}) (c *Config, err error) {
var jm map[string]map[string]interface{} var asMap struct {
if err := mapstructure.Decode(i, &jm); err != nil { Global map[string]interface{}
return nil, errors.Wrap(err, "config must be a dict with job name as key and jobs as values") Jobs []map[string]interface{}
}
if err := mapstructure.Decode(i, &asMap); err != nil {
return nil, errors.Wrap(err, "config root must be a dict")
} }
c = &Config{ c = &Config{}
Jobs: make(map[string]Job, len(jm)),
}
for name := range jm { // Parse global with defaults
c.Global.Serve.Stdinserver.SockDir = "/var/run/zrepl/stdinserver"
c.Jobs[name], err = parseJob(name, jm[name]) err = mapstructure.Decode(asMap.Global, &c.Global)
if err != nil { if err != nil {
err = errors.Wrapf(err, "cannot parse job '%s'", name) err = errors.Wrap(err, "cannot parse global section: %s")
return
}
// Parse Jobs
c.Jobs = make(map[string]Job, len(asMap.Jobs))
for i := range asMap.Jobs {
job, err := parseJob(asMap.Jobs[i])
if err != nil {
// Try to find its name
namei, ok := asMap.Jobs[i]["name"]
if !ok {
namei = fmt.Sprintf("<no name, %i in list>", i)
}
err = errors.Wrapf(err, "cannot parse job '%v'", namei)
return nil, err return nil, err
} }
c.Jobs[job.JobName()] = job
} }
return c, nil return c, nil
} }
func parseJob(name string, i map[string]interface{}) (j Job, err error) { func extractStringField(i map[string]interface{}, key string, notempty bool) (field string, err error) {
vi, ok := i[key]
jobtype_i, ok := i["type"]
if !ok { if !ok {
err = errors.New("must have field 'type'") err = errors.Errorf("must have field '%s'", key)
return nil, err return "", err
} }
jobtype_str, ok := jobtype_i.(string) field, ok = vi.(string)
if !ok { if !ok {
err = errors.New("'type' field must have type string") err = errors.Errorf("'%s' field must have type string", key)
return nil, err return "", err
}
if notempty && len(field) <= 0 {
err = errors.Errorf("'%s' field must not be empty", key)
return "", err
}
return
} }
switch jobtype_str { func parseJob(i map[string]interface{}) (j Job, err error) {
name, err := extractStringField(i, "name", true)
if err != nil {
return
}
jobtype, err := extractStringField(i, "type", true)
if err != nil {
return
}
switch jobtype {
case "pull": case "pull":
return parsePullJob(name, i) return parsePullJob(name, i)
case "source": case "source":
@ -74,7 +107,7 @@ func parseJob(name string, i map[string]interface{}) (j Job, err error) {
case "local": case "local":
return parseLocalJob(name, i) return parseLocalJob(name, i)
default: default:
return nil, errors.Errorf("unknown job type '%s'", jobtype_str) return nil, errors.Errorf("unknown job type '%s'", jobtype)
} }
panic("implementation error") panic("implementation error")
@ -83,22 +116,17 @@ func parseJob(name string, i map[string]interface{}) (j Job, err error) {
} }
func parseConnect(i map[string]interface{}) (c RPCConnecter, err error) { func parseConnect(i map[string]interface{}) (c RPCConnecter, err error) {
type_i, ok := i["type"]
if !ok { t, err := extractStringField(i, "type", true)
err = errors.New("must have field 'type'") if err != nil {
return
}
type_str, ok := type_i.(string)
if !ok {
err = errors.New("'type' field must have type string")
return nil, err return nil, err
} }
switch type_str { switch t {
case "ssh+stdinserver": case "ssh+stdinserver":
return parseSSHStdinserverConnecter(i) return parseSSHStdinserverConnecter(i)
default: default:
return nil, errors.Errorf("unknown connection type '%s'", type_str) return nil, errors.Errorf("unknown connection type '%s'", t)
} }
panic("implementation error") panic("implementation error")
@ -131,9 +159,8 @@ err:
func parsePrunePolicy(v map[string]interface{}) (p PrunePolicy, err error) { func parsePrunePolicy(v map[string]interface{}) (p PrunePolicy, err error) {
policyName, ok := v["policy"] policyName, err := extractStringField(v, "policy", true)
if !ok { if err != nil {
err = errors.Errorf("policy name not specified")
return return
} }
@ -152,22 +179,16 @@ func parsePrunePolicy(v map[string]interface{}) (p PrunePolicy, err error) {
func parseAuthenticatedChannelListenerFactory(v map[string]interface{}) (p AuthenticatedChannelListenerFactory, err error) { func parseAuthenticatedChannelListenerFactory(v map[string]interface{}) (p AuthenticatedChannelListenerFactory, err error) {
t, ok := v["type"] t, err := extractStringField(v, "type", true)
if !ok { if err != nil {
err = errors.Errorf("must specify 'type' field") return nil, err
return
}
s, ok := t.(string)
if !ok {
err = errors.Errorf("'type' must be a string")
return
} }
switch s{ switch t {
case "stdinserver": case "stdinserver":
return parseStdinserverListenerFactory(v) return parseStdinserverListenerFactory(v)
default: default:
err = errors.Errorf("unknown type '%s'", s) err = errors.Errorf("unknown type '%s'", t)
return return
} }

View File

@ -1,5 +1,5 @@
mirror_local: jobs:
- name: mirror_local
type: local type: local
# snapshot the filesystems matched by the left-hand-side of the mapping # snapshot the filesystems matched by the left-hand-side of the mapping

View File

@ -1,7 +1,7 @@
fullbackup_prod1: jobs:
- name: fullbackup_prod1
# connect to remote using ssh / stdinserver command
type: pull type: pull
# connect to remote using ssh / stdinserver command
connect: connect:
type: ssh+stdinserver type: ssh+stdinserver
host: prod1.example.com host: prod1.example.com

View File

@ -1,14 +1,30 @@
fullbackup_prod1: global:
serve:
stdinserver:
# Directory where AF_UNIX sockets for stdinserver command are placed.
#
# `zrepl stdinserver CLIENT_IDENTITY`
# * connects to the socket in $sockdir/CLIENT_IDENTITY
# * sends its stdin / stdout file descriptors to the `zrepl daemon` process (see cmsg(3))
# * does nothing more
#
# This enables a setup where `zrepl daemon` is not directly exposed to the internet
# but instead all traffic is tunnelled through SSH.
# The server with the source job has an authorized_keys file entry for the public key
# used by the corresponding pull job
#
# command="/mnt/zrepl stdinserver CLIENT_IDENTITY" ssh-ed25519 AAAAC3NzaC1E... zrepl@pullingserver
#
# Below is the default value.
sockdir: /var/run/zrepl/stdinserver
jobs:
- name: fullbackup_prod1
# expect remote to connect via ssh+stdinserver with fullbackup_prod1 as client_identity # expect remote to connect via ssh+stdinserver with fullbackup_prod1 as client_identity
type: source type: source
serve: serve:
# Creates an AF_UNIX socket with name client_identity in a well-known directory type: stdinserver # see global.serve.stdinserver for explanation
# private to the zrepl user (which runs both the zrepld and the stdinserver command via authorized_keys)
# The stdinserver command connects to that socket and sends its stdin and stdout
# file descriptors over that UNIX socket to the zrepld.
# This avoids additional memory-to-memory copies and is more portable than splice(2) on Linux.
type: stdinserver
client_identity: fullbackup_prod1 client_identity: fullbackup_prod1
# snapshot these filesystems every 10m with zrepl_ as prefix # snapshot these filesystems every 10m with zrepl_ as prefix

View File

@ -1,4 +1,5 @@
fullbackup_prod1: jobs:
- name: fullbackup_prod1
# expect remote to connect via ssh+stdinserver with fullbackup_prod1 as client_identity # expect remote to connect via ssh+stdinserver with fullbackup_prod1 as client_identity
type: push-sink type: push-sink

View File

@ -1,4 +1,5 @@
fullbackup_prod1: jobs:
- name: fullbackup_prod1
# connect to remote using ssh / stdinserver command # connect to remote using ssh / stdinserver command
type: push type: push