WIP: Switch to new config format.

Don't use jobrun for daemon, just call JobDo() once, the job must
organize stuff itself.

Sacrifice all the oneshot commands, they will be reintroduced as
client-calls to the daemon.
This commit is contained in:
Christian Schwarz 2017-09-10 16:13:05 +02:00
parent 8bf3516003
commit 73c9033583
20 changed files with 865 additions and 1291 deletions

View File

@ -1,85 +0,0 @@
package cmd
import (
"fmt"
"os"
"sync"
"time"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/jobrun"
"github.com/zrepl/zrepl/zfs"
)
var AutosnapCmd = &cobra.Command{
Use: "autosnap",
Short: "perform automatic snapshotting",
Run: cmdAutosnap,
}
func init() {
RootCmd.AddCommand(AutosnapCmd)
}
func cmdAutosnap(cmd *cobra.Command, args []string) {
var wg sync.WaitGroup
r := jobrun.NewJobRunner(log)
wg.Add(1)
go func() {
r.Run()
wg.Done()
}()
if len(args) < 1 {
log.Printf("must specify exactly one job as positional argument")
os.Exit(1)
}
snap, ok := conf.Autosnaps[args[0]]
if !ok {
log.Printf("could not find autosnap job: %s", args[0])
os.Exit(1)
}
r.AddJob(snap)
wg.Wait()
}
type AutosnapContext struct {
Autosnap *Autosnap
}
func doAutosnap(ctx AutosnapContext, log Logger) (err error) {
snap := ctx.Autosnap
filesystems, err := zfs.ZFSListMapping(snap.DatasetFilter)
if err != nil {
return fmt.Errorf("cannot filter datasets: %s", err)
}
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", snap.Prefix, suffix)
hadError := false
for _, fs := range filesystems { // optimization: use recursive snapshots / channel programs here
log.Printf("snapshotting filesystem %s@%s", fs.ToString(), snapname)
err := zfs.ZFSSnapshot(fs, snapname, false)
if err != nil {
log.Printf("error snapshotting %s: %s", fs, err)
hadError = true
}
}
if hadError {
err = fmt.Errorf("errors occurred during autosnap, check logs for details")
}
return
}

31
cmd/config.go Normal file
View File

@ -0,0 +1,31 @@
package cmd
import (
"io"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/zfs"
)
type Config struct {
Jobs map[string]Job
}
type RPCConnecter interface {
Connect() (rpc.RPCClient, error)
}
type AuthenticatedChannelListenerFactory interface {
Listen() AuthenticatedChannelListener
}
type AuthenticatedChannelListener interface {
Accept() (ch io.ReadWriteCloser, err error)
}
type SSHStdinServerConnectDescr struct {
}
type PrunePolicy interface {
Prune(fs zfs.DatasetPath, versions []zfs.FilesystemVersion) (keep, remote []zfs.FilesystemVersion, err error)
}

56
cmd/config_connect.go Normal file
View File

@ -0,0 +1,56 @@
package cmd
import (
"fmt"
"io"
"github.com/jinzhu/copier"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/sshbytestream"
"github.com/zrepl/zrepl/util"
)
type SSHStdinserverConnecter struct {
Host string
User string
Port uint16
IdentityFile string `mapstructure:"identity_file"`
TransportOpenCommand []string `mapstructure:"transport_open_command"`
SSHCommand string `mapstructure:"ssh_command"`
Options []string
ConnLogReadFile string `mapstructure:"connlog_read_file"`
ConnLogWriteFile string `mapstructure:"connlog_write_file"`
}
func parseSSHStdinserverConnecter(i map[string]interface{}) (c *SSHStdinserverConnecter, err error) {
c = &SSHStdinserverConnecter{}
if err = mapstructure.Decode(i, c); err != nil {
err = errors.New(fmt.Sprintf("could not parse ssh transport: %s", err))
return nil, err
}
// TODO assert fields are filled
return
}
func (c *SSHStdinserverConnecter) Connect() (client rpc.RPCClient, err error) {
var stream io.ReadWriteCloser
var rpcTransport sshbytestream.SSHTransport
if err = copier.Copy(&rpcTransport, c); err != nil {
return
}
if stream, err = sshbytestream.Outgoing(rpcTransport); err != nil {
return
}
stream, err = util.NewReadWriteCloserLogger(stream, c.ConnLogReadFile, c.ConnLogWriteFile)
if err != nil {
return
}
client = rpc.NewClient(stream)
return client, nil
}

25
cmd/config_fsvfilter.go Normal file
View File

@ -0,0 +1,25 @@
package cmd
import (
"github.com/zrepl/zrepl/zfs"
"strings"
"github.com/pkg/errors"
)
type PrefixSnapshotFilter struct {
Prefix string
}
func parsePrefixSnapshotFilter(i string) (f *PrefixSnapshotFilter, err error) {
if !(len(i) > 0) {
err = errors.Errorf("snapshot prefix must be longer than 0 characters")
return
}
f = &PrefixSnapshotFilter{i}
return
}
func (f *PrefixSnapshotFilter) Filter(fsv zfs.FilesystemVersion) (accept bool, err error) {
return fsv.Type == zfs.Snapshot && strings.HasPrefix(fsv.Name, f.Prefix), nil
}

87
cmd/config_job_local.go Normal file
View File

@ -0,0 +1,87 @@
package cmd
import (
"time"
"github.com/zrepl/zrepl/rpc"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
type LocalJob struct {
Name string
Mapping *DatasetMapFilter
SnapshotFilter *PrefixSnapshotFilter
Interval time.Duration
InitialReplPolicy InitialReplPolicy
PruneLHS PrunePolicy
PruneRHS PrunePolicy
}
func parseLocalJob(name string, i map[string]interface{}) (j *LocalJob, err error) {
var asMap struct {
Mapping map[string]string
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
Interval string
InitialReplPolicy string `mapstructure:"initial_repl_policy"`
PruneLHS map[string]interface{} `mapstructure:"prune_lhs"`
PruneRHS map[string]interface{} `mapstructure:"prune_rhs"`
}
if err = mapstructure.Decode(i, &asMap); err != nil {
err = errors.Wrap(err, "mapstructure error")
return nil, err
}
j = &LocalJob{Name: name}
if j.Mapping, err = parseDatasetMapFilter(asMap.Mapping, false); err != nil {
return
}
if j.SnapshotFilter, err = parsePrefixSnapshotFilter(asMap.SnapshotPrefix); err != nil {
return
}
if j.Interval, err = time.ParseDuration(asMap.Interval); err != nil {
err = errors.Wrap(err, "cannot parse interval")
return
}
if j.InitialReplPolicy, err = parseInitialReplPolicy(asMap.InitialReplPolicy, DEFAULT_INITIAL_REPL_POLICY); err != nil {
return
}
if j.PruneLHS, err = parsePrunePolicy(asMap.PruneLHS); err != nil {
err = errors.Wrap(err, "cannot parse 'prune_lhs'")
return
}
if j.PruneRHS, err = parsePrunePolicy(asMap.PruneRHS); err != nil {
err = errors.Wrap(err, "cannot parse 'prune_rhs'")
return
}
return
}
func (j *LocalJob) JobName() string {
return j.Name
}
func (j *LocalJob) JobDo(log Logger) (err error) {
local := rpc.NewLocalRPC()
handler := Handler{
Logger: log,
// Allow access to any dataset since we control what mapping
// is passed to the pull routine.
// All local datasets will be passed to its Map() function,
// but only those for which a mapping exists will actually be pulled.
// We can pay this small performance penalty for now.
PullACL: localPullACL{},
}
registerEndpoints(local, handler)
return doPull(PullContext{local, log, j.Mapping, j.InitialReplPolicy})
}

78
cmd/config_job_pull.go Normal file
View File

@ -0,0 +1,78 @@
package cmd
import (
"time"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
type PullJob struct {
Name string
Connect RPCConnecter
Mapping *DatasetMapFilter
SnapshotFilter *PrefixSnapshotFilter
InitialReplPolicy InitialReplPolicy
Prune PrunePolicy
}
func parsePullJob(name string, i map[string]interface{}) (j *PullJob, err error) {
var asMap struct {
Connect map[string]interface{}
Mapping map[string]string
InitialReplPolicy string `mapstructure:"initial_repl_policy"`
Prune map[string]interface{}
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
}
if err = mapstructure.Decode(i, &asMap); err != nil {
err = errors.Wrap(err, "mapstructure error")
return nil, err
}
j = &PullJob{Name: name}
j.Connect, err = parseSSHStdinserverConnecter(asMap.Connect)
if err != nil {
err = errors.Wrap(err, "cannot parse 'connect'")
return nil, err
}
j.Mapping, err = parseDatasetMapFilter(asMap.Mapping, false)
if err != nil {
err = errors.Wrap(err, "cannot parse 'mapping'")
return nil, err
}
j.InitialReplPolicy, err = parseInitialReplPolicy(asMap.InitialReplPolicy, DEFAULT_INITIAL_REPL_POLICY)
if err != nil {
err = errors.Wrap(err, "cannot parse 'initial_repl_policy'")
return
}
if j.SnapshotFilter, err = parsePrefixSnapshotFilter(asMap.SnapshotPrefix); err != nil {
return
}
if j.Prune, err = parsePrunePolicy(asMap.Prune); err != nil {
err = errors.Wrap(err, "cannot parse prune policy")
return
}
return
}
func (j *PullJob) JobName() string {
return j.Name
}
func (j *PullJob) JobDo(log Logger) (err error) {
client, err := j.Connect.Connect()
if err != nil {
log.Printf("error connect: %s", err)
return err
}
defer closeRPCWithTimeout(log, client, time.Second*10, "")
return doPull(PullContext{client, log, j.Mapping, j.InitialReplPolicy})
}

100
cmd/config_job_source.go Normal file
View File

@ -0,0 +1,100 @@
package cmd
import (
"time"
"github.com/zrepl/zrepl/rpc"
mapstructure "github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)
type SourceJob struct {
Name string
Serve AuthenticatedChannelListenerFactory
Datasets *DatasetMapFilter
SnapshotFilter *PrefixSnapshotFilter
Interval time.Duration
Prune PrunePolicy
}
func parseSourceJob(name string, i map[string]interface{}) (j *SourceJob, err error) {
var asMap struct {
Serve map[string]interface{}
Datasets map[string]string
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
Interval string
Prune map[string]interface{}
}
if err = mapstructure.Decode(i, &asMap); err != nil {
err = errors.Wrap(err, "mapstructure error")
return nil, err
}
j = &SourceJob{Name: name}
if j.Serve, err = parseAuthenticatedChannelListenerFactory(asMap.Serve); err != nil {
return
}
if j.Datasets, err = parseDatasetMapFilter(asMap.Datasets, true); err != nil {
return
}
if j.SnapshotFilter, err = parsePrefixSnapshotFilter(asMap.SnapshotPrefix); err != nil {
return
}
if j.Interval, err = time.ParseDuration(asMap.Interval); err != nil {
err = errors.Wrap(err, "cannot parse 'interval'")
return
}
if j.Prune, err = parsePrunePolicy(asMap.Prune); err != nil {
return
}
return
}
func (j *SourceJob) JobName() string {
return j.Name
}
func (j *SourceJob) JobDo(log Logger) (err error) {
// Setup automatic snapshotting
listener, err := j.Serve.Listen()
if err != nil {
return err
}
for {
// listener does auth for us
rwc, err := listener.Accept()
if err != nil {
// if err != AuthError...
panic(err) // TODO
}
// construct connection handler
handler := Handler{
Logger: log,
PullACL: j.Datasets,
// TODO should set SinkMapping here? no, but check Handler impl
}
// handle connection
server := rpc.NewServer(rwc)
registerEndpoints(server, handler)
if err = server.Serve(); err != nil {
log.Printf("error serving connection: %s", err)
}
rwc.Close()
}
}

View File

@ -5,6 +5,7 @@ import (
"strings" "strings"
"github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/zfs"
"github.com/mitchellh/mapstructure"
) )
type DatasetMapFilter struct { type DatasetMapFilter struct {
@ -23,8 +24,8 @@ type datasetMapFilterEntry struct {
subtreeMatch bool subtreeMatch bool
} }
func NewDatasetMapFilter(capacity int, filterOnly bool) DatasetMapFilter { func NewDatasetMapFilter(capacity int, filterOnly bool) *DatasetMapFilter {
return DatasetMapFilter{ return &DatasetMapFilter{
entries: make([]datasetMapFilterEntry, 0, capacity), entries: make([]datasetMapFilterEntry, 0, capacity),
filterOnly: filterOnly, filterOnly: filterOnly,
} }
@ -158,3 +159,21 @@ func parseDatasetFilterResult(result string) (pass bool, err error) {
} }
return return
} }
func parseDatasetMapFilter(mi interface{}, filterOnly bool) (f *DatasetMapFilter, err error) {
var m map[string]string
if err = mapstructure.Decode(mi, &m); err != nil {
err = fmt.Errorf("maps / filters must be specified as map[string]string: %s", err)
return
}
f = NewDatasetMapFilter(len(m), filterOnly)
for pathPattern, mapping := range m {
if err = f.Add(pathPattern, mapping); err != nil {
err = fmt.Errorf("invalid mapping entry ['%s':'%s']: %s", pathPattern, mapping, err)
return
}
}
return
}

View File

@ -1,775 +0,0 @@
package cmd
import (
"fmt"
"github.com/jinzhu/copier"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/jobrun"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/sshbytestream"
. "github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
yaml "gopkg.in/yaml.v2"
"io"
"io/ioutil"
"regexp"
"strconv"
"strings"
"time"
)
var (
JobSectionPush string = "push"
JobSectionPull string = "pull"
JobSectionPrune string = "prune"
JobSectionAutosnap string = "autosnap"
JobSectionPullACL string = "pull_acl"
JobSectionSinks string = "sink"
)
type Remote struct {
Name string
Transport Transport
}
type Transport interface {
Connect(rpcLog Logger) (rpc.RPCClient, error)
}
type LocalTransport struct {
}
type SSHTransport struct {
Host string
User string
Port uint16
IdentityFile string `mapstructure:"identity_file"`
TransportOpenCommand []string `mapstructure:"transport_open_command"`
SSHCommand string `mapstructure:"ssh_command"`
Options []string
ConnLogReadFile string `mapstructure:"connlog_read_file"`
ConnLogWriteFile string `mapstructure:"connlog_write_file"`
}
type Push struct {
jobName string // for use with jobrun package
To *Remote
Filter DatasetMapFilter
InitialReplPolicy InitialReplPolicy
RepeatStrategy jobrun.RepeatStrategy
}
type Pull struct {
jobName string // for use with jobrun package
From *Remote
Mapping DatasetMapFilter
InitialReplPolicy InitialReplPolicy
RepeatStrategy jobrun.RepeatStrategy
}
type Prune struct {
jobName string // for use with jobrun package
DatasetFilter DatasetMapFilter
SnapshotFilter zfs.FilesystemVersionFilter
RetentionPolicy *RetentionGrid // TODO abstract interface to support future policies?
Repeat jobrun.RepeatStrategy
}
type Autosnap struct {
jobName string // for use with jobrun package
Prefix string
Interval jobrun.RepeatStrategy
DatasetFilter DatasetMapFilter
}
type Config struct {
Remotes map[string]*Remote
Pushs map[string]*Push // job name -> job
Pulls map[string]*Pull // job name -> job
Sinks map[string]DatasetMapFilter // client identity -> mapping
PullACLs map[string]DatasetMapFilter // client identity -> filter
Prunes map[string]*Prune // job name -> job
Autosnaps map[string]*Autosnap // job name -> job
}
func ParseConfig(path string) (config Config, err error) {
c := make(map[string]interface{}, 0)
var bytes []byte
if bytes, err = ioutil.ReadFile(path); err != nil {
err = errors.WithStack(err)
return
}
if err = yaml.Unmarshal(bytes, &c); err != nil {
err = errors.WithStack(err)
return
}
return parseMain(c)
}
func parseMain(root map[string]interface{}) (c Config, err error) {
if c.Remotes, err = parseRemotes(root["remotes"]); err != nil {
return
}
remoteLookup := func(name string) (remote *Remote, err error) {
remote = c.Remotes[name]
if remote == nil {
err = fmt.Errorf("remote '%s' not defined", name)
}
return
}
if c.Pushs, err = parsePushs(root[JobSectionPush], remoteLookup); err != nil {
return
}
if c.Pulls, err = parsePulls(root[JobSectionPull], remoteLookup); err != nil {
return
}
if c.Sinks, err = parseSinks(root[JobSectionSinks]); err != nil {
return
}
if c.PullACLs, err = parsePullACLs(root[JobSectionPullACL]); err != nil {
return
}
if c.Prunes, err = parsePrunes(root[JobSectionPrune]); err != nil {
return
}
if c.Autosnaps, err = parseAutosnaps(root[JobSectionAutosnap]); err != nil {
return
}
return
}
func (c *Config) resolveJobName(jobname string) (i interface{}, err error) {
s := strings.SplitN(jobname, ".", 2)
if len(s) != 2 {
return nil, fmt.Errorf("invalid job name syntax (section.name)")
}
section, name := s[0], s[1]
var ok bool
switch section {
case JobSectionAutosnap:
i, ok = c.Autosnaps[name]
case JobSectionPush:
i, ok = c.Pushs[name]
case JobSectionPull:
i, ok = c.Pulls[name]
case JobSectionPrune:
i, ok = c.Prunes[name]
case JobSectionPullACL:
i, ok = c.PullACLs[name]
case JobSectionSinks:
i, ok = c.Sinks[name]
default:
return nil, fmt.Errorf("invalid section name: %s", section)
}
if !ok {
return nil, fmt.Errorf("cannot find job '%s' in section '%s'", name, section)
}
return i, nil
}
func fullJobName(section, name string) (full string, err error) {
if len(name) < 1 {
err = fmt.Errorf("job name not set")
return
}
full = fmt.Sprintf("%s.%s", section, name)
return
}
func parseRemotes(v interface{}) (remotes map[string]*Remote, err error) {
asMap := make(map[string]struct {
Transport map[string]interface{}
}, 0)
if err = mapstructure.Decode(v, &asMap); err != nil {
return
}
remotes = make(map[string]*Remote, len(asMap))
for name, p := range asMap {
if name == LOCAL_TRANSPORT_IDENTITY {
err = errors.New(fmt.Sprintf("remote name '%s' reserved for local pulls", LOCAL_TRANSPORT_IDENTITY))
return
}
var transport Transport
if transport, err = parseTransport(p.Transport); err != nil {
return
}
remotes[name] = &Remote{
Name: name,
Transport: transport,
}
}
return
}
func parseTransport(it map[string]interface{}) (t Transport, err error) {
if len(it) != 1 {
err = errors.New("ambiguous transport type")
return
}
for key, val := range it {
switch key {
case "ssh":
t := SSHTransport{}
if err = mapstructure.Decode(val, &t); err != nil {
err = errors.New(fmt.Sprintf("could not parse ssh transport: %s", err))
return nil, err
}
return t, nil
default:
return nil, errors.New(fmt.Sprintf("unknown transport type '%s'\n", key))
}
}
return // unreachable
}
type remoteLookup func(name string) (*Remote, error)
func parsePushs(v interface{}, rl remoteLookup) (p map[string]*Push, err error) {
asMap := make(map[string]struct {
To string
Filter map[string]string
InitialReplPolicy string
Repeat map[string]string
}, 0)
if err = mapstructure.Decode(v, &asMap); err != nil {
return
}
p = make(map[string]*Push, len(asMap))
for name, e := range asMap {
var toRemote *Remote
if toRemote, err = rl(e.To); err != nil {
return
}
push := &Push{
To: toRemote,
}
if push.jobName, err = fullJobName(JobSectionPush, name); err != nil {
return
}
if push.Filter, err = parseDatasetMapFilter(e.Filter, true); err != nil {
return
}
if push.InitialReplPolicy, err = parseInitialReplPolicy(e.InitialReplPolicy, DEFAULT_INITIAL_REPL_POLICY); err != nil {
return
}
if push.RepeatStrategy, err = parseRepeatStrategy(e.Repeat); err != nil {
return
}
p[name] = push
}
return
}
func parsePulls(v interface{}, rl remoteLookup) (p map[string]*Pull, err error) {
asMap := make(map[string]struct {
From string
Mapping map[string]string
InitialReplPolicy string
Repeat map[string]string
}, 0)
if err = mapstructure.Decode(v, &asMap); err != nil {
return
}
p = make(map[string]*Pull, len(asMap))
for name, e := range asMap {
if len(e.From) < 1 {
err = fmt.Errorf("source not set ('from' attribute is empty)")
return
}
var fromRemote *Remote
if e.From == LOCAL_TRANSPORT_IDENTITY {
fromRemote = &Remote{
Name: LOCAL_TRANSPORT_IDENTITY,
Transport: LocalTransport{},
}
} else {
if fromRemote, err = rl(e.From); err != nil {
return
}
}
pull := &Pull{
From: fromRemote,
}
if pull.jobName, err = fullJobName(JobSectionPull, name); err != nil {
return
}
if pull.Mapping, err = parseDatasetMapFilter(e.Mapping, false); err != nil {
return
}
if pull.InitialReplPolicy, err = parseInitialReplPolicy(e.InitialReplPolicy, DEFAULT_INITIAL_REPL_POLICY); err != nil {
return
}
if pull.RepeatStrategy, err = parseRepeatStrategy(e.Repeat); err != nil {
return
}
p[name] = pull
}
return
}
func parseInitialReplPolicy(v interface{}, defaultPolicy InitialReplPolicy) (p InitialReplPolicy, err error) {
s, ok := v.(string)
if !ok {
goto err
}
switch {
case s == "":
p = defaultPolicy
case s == "most_recent":
p = InitialReplPolicyMostRecent
case s == "all":
p = InitialReplPolicyAll
default:
goto err
}
return
err:
err = errors.New(fmt.Sprintf("expected InitialReplPolicy, got %#v", v))
return
}
func parseRepeatStrategy(r map[string]string) (s jobrun.RepeatStrategy, err error) {
if r == nil {
return jobrun.NoRepeatStrategy{}, nil
}
if repeatStr, ok := r["interval"]; ok {
d, err := parseDuration(repeatStr)
if err != nil {
return nil, err
}
s = &jobrun.PeriodicRepeatStrategy{d}
return s, err
} else {
return nil, fmt.Errorf("attribute 'interval' not found but required in repeat specification")
}
}
func expectList(v interface{}) (asList []interface{}, err error) {
var ok bool
if asList, ok = v.([]interface{}); !ok {
err = errors.New("expected list")
}
return
}
func parseSinks(v interface{}) (m map[string]DatasetMapFilter, err error) {
var asMap map[string]map[string]interface{}
if err = mapstructure.Decode(v, &asMap); err != nil {
return
}
m = make(map[string]DatasetMapFilter, len(asMap))
for identity, entry := range asMap {
parseSink := func() (mapping DatasetMapFilter, err error) {
mappingMap, ok := entry["mapping"]
if !ok {
err = fmt.Errorf("no mapping specified")
return
}
mapping, err = parseDatasetMapFilter(mappingMap, false)
return
}
mapping, sinkErr := parseSink()
if sinkErr != nil {
err = fmt.Errorf("cannot parse sink for identity '%s': %s", identity, sinkErr)
return
}
m[identity] = mapping
}
return
}
func parsePullACLs(v interface{}) (m map[string]DatasetMapFilter, err error) {
var asMap map[string]map[string]interface{}
if err = mapstructure.Decode(v, &asMap); err != nil {
return
}
m = make(map[string]DatasetMapFilter, len(asMap))
for identity, entry := range asMap {
parsePullACL := func() (filter DatasetMapFilter, err error) {
filterMap, ok := entry["filter"]
if !ok {
err = fmt.Errorf("no filter specified")
return
}
filter, err = parseDatasetMapFilter(filterMap, true)
return
}
filter, filterErr := parsePullACL()
if filterErr != nil {
err = fmt.Errorf("cannot parse pull-ACL for identity '%s': %s", identity, filterErr)
return
}
m[identity] = filter
}
return
}
func parseDatasetMapFilter(mi interface{}, filterOnly bool) (f DatasetMapFilter, err error) {
var m map[string]string
if err = mapstructure.Decode(mi, &m); err != nil {
err = fmt.Errorf("maps / filters must be specified as map[string]string: %s", err)
return
}
f = NewDatasetMapFilter(len(m), filterOnly)
for pathPattern, mapping := range m {
if err = f.Add(pathPattern, mapping); err != nil {
err = fmt.Errorf("invalid mapping entry ['%s':'%s']: %s", pathPattern, mapping, err)
return
}
}
return
}
func (t SSHTransport) Connect(rpcLog Logger) (r rpc.RPCClient, err error) {
var stream io.ReadWriteCloser
var rpcTransport sshbytestream.SSHTransport
if err = copier.Copy(&rpcTransport, t); err != nil {
return
}
if stream, err = sshbytestream.Outgoing(rpcTransport); err != nil {
return
}
stream, err = NewReadWriteCloserLogger(stream, t.ConnLogReadFile, t.ConnLogWriteFile)
if err != nil {
return
}
client := rpc.NewClient(stream)
return client, nil
}
func (t LocalTransport) Connect(rpcLog Logger) (r rpc.RPCClient, err error) {
local := rpc.NewLocalRPC()
handler := Handler{
Logger: log,
// Allow access to any dataset since we control what mapping
// is passed to the pull routine.
// All local datasets will be passed to its Map() function,
// but only those for which a mapping exists will actually be pulled.
// We can pay this small performance penalty for now.
PullACL: localPullACL{},
}
registerEndpoints(local, handler)
return local, nil
}
func parsePrunes(m interface{}) (rets map[string]*Prune, err error) {
asList := make(map[string]map[string]interface{}, 0)
if err = mapstructure.Decode(m, &asList); err != nil {
return nil, errors.Wrap(err, "mapstructure error")
}
rets = make(map[string]*Prune, len(asList))
for name, e := range asList {
var prune *Prune
if prune, err = parsePrune(e, name); err != nil {
err = fmt.Errorf("cannot parse prune job %s: %s", name, err)
return
}
rets[name] = prune
}
return
}
func parsePrune(e map[string]interface{}, name string) (prune *Prune, err error) {
// Only support grid policy for now
policyName, ok := e["policy"]
if !ok || policyName != "grid" {
err = fmt.Errorf("prune job with unimplemented policy '%s'", policyName)
return
}
var i struct {
Grid string
DatasetFilter map[string]string `mapstructure:"dataset_filter"`
SnapshotFilter map[string]string `mapstructure:"snapshot_filter"`
Repeat map[string]string
}
if err = mapstructure.Decode(e, &i); err != nil {
return
}
prune = &Prune{}
if prune.jobName, err = fullJobName(JobSectionPrune, name); err != nil {
return
}
// Parse grid policy
intervals, err := parseRetentionGridIntervalsString(i.Grid)
if err != nil {
err = fmt.Errorf("cannot parse retention grid: %s", err)
return
}
// Assert intervals are of increasing length (not necessarily required, but indicates config mistake)
lastDuration := time.Duration(0)
for i := range intervals {
if intervals[i].Length < lastDuration {
err = fmt.Errorf("retention grid interval length must be monotonically increasing:"+
"interval %d is shorter than %d", i+1, i)
return
} else {
lastDuration = intervals[i].Length
}
}
prune.RetentionPolicy = NewRetentionGrid(intervals)
// Parse filters
if prune.DatasetFilter, err = parseDatasetMapFilter(i.DatasetFilter, true); err != nil {
err = fmt.Errorf("cannot parse dataset filter: %s", err)
return
}
if prune.SnapshotFilter, err = parseSnapshotFilter(i.SnapshotFilter); err != nil {
err = fmt.Errorf("cannot parse snapshot filter: %s", err)
return
}
// Parse repeat strategy
if prune.Repeat, err = parseRepeatStrategy(i.Repeat); err != nil {
err = fmt.Errorf("cannot parse repeat strategy: %s", err)
return
}
return
}
var durationStringRegex *regexp.Regexp = regexp.MustCompile(`^\s*(\d+)\s*(s|m|h|d|w)\s*$`)
func parseDuration(e string) (d time.Duration, err error) {
comps := durationStringRegex.FindStringSubmatch(e)
if len(comps) != 3 {
err = fmt.Errorf("does not match regex: %s %#v", e, comps)
return
}
durationFactor, err := strconv.ParseInt(comps[1], 10, 64)
if err != nil {
return
}
var durationUnit time.Duration
switch comps[2] {
case "s":
durationUnit = time.Second
case "m":
durationUnit = time.Minute
case "h":
durationUnit = time.Hour
case "d":
durationUnit = 24 * time.Hour
case "w":
durationUnit = 24 * 7 * time.Hour
default:
err = fmt.Errorf("contains unknown time unit '%s'", comps[2])
return
}
d = time.Duration(durationFactor) * durationUnit
return
}
var retentionStringIntervalRegex *regexp.Regexp = regexp.MustCompile(`^\s*(\d+)\s*x\s*([^\(]+)\s*(\((.*)\))?\s*$`)
func parseRetentionGridIntervalString(e string) (intervals []RetentionInterval, err error) {
comps := retentionStringIntervalRegex.FindStringSubmatch(e)
if comps == nil {
err = fmt.Errorf("retention string does not match expected format")
return
}
times, err := strconv.Atoi(comps[1])
if err != nil {
return nil, err
} else if times <= 0 {
return nil, fmt.Errorf("contains factor <= 0")
}
duration, err := parseDuration(comps[2])
if err != nil {
return nil, err
}
keepCount := 1
if comps[3] != "" {
// Decompose key=value, comma separated
// For now, only keep_count is supported
re := regexp.MustCompile(`^\s*keep=(.+)\s*$`)
res := re.FindStringSubmatch(comps[4])
if res == nil || len(res) != 2 {
err = fmt.Errorf("interval parameter contains unknown parameters")
return
}
if res[1] == "all" {
keepCount = RetentionGridKeepCountAll
} else {
keepCount, err = strconv.Atoi(res[1])
if err != nil {
err = fmt.Errorf("cannot parse keep_count value")
return
}
}
}
intervals = make([]RetentionInterval, times)
for i := range intervals {
intervals[i] = RetentionInterval{
Length: duration,
KeepCount: keepCount,
}
}
return
}
func parseRetentionGridIntervalsString(s string) (intervals []RetentionInterval, err error) {
ges := strings.Split(s, "|")
intervals = make([]RetentionInterval, 0, 7*len(ges))
for intervalIdx, e := range ges {
parsed, err := parseRetentionGridIntervalString(e)
if err != nil {
return nil, fmt.Errorf("cannot parse interval %d of %d: %s: %s", intervalIdx+1, len(ges), err, strings.TrimSpace(e))
}
intervals = append(intervals, parsed...)
}
return
}
type prefixSnapshotFilter struct {
prefix string
}
func (f prefixSnapshotFilter) Filter(fsv zfs.FilesystemVersion) (accept bool, err error) {
return fsv.Type == zfs.Snapshot && strings.HasPrefix(fsv.Name, f.prefix), nil
}
func parseSnapshotFilter(fm map[string]string) (snapFilter zfs.FilesystemVersionFilter, err error) {
prefix, ok := fm["prefix"]
if !ok {
err = fmt.Errorf("unsupported snapshot filter")
return
}
snapFilter = prefixSnapshotFilter{prefix}
return
}
func parseAutosnaps(m interface{}) (snaps map[string]*Autosnap, err error) {
asMap := make(map[string]interface{}, 0)
if err = mapstructure.Decode(m, &asMap); err != nil {
return
}
snaps = make(map[string]*Autosnap, len(asMap))
for name, e := range asMap {
var snap *Autosnap
if snap, err = parseAutosnap(e, name); err != nil {
err = fmt.Errorf("cannot parse autonsap job %s: %s", name, err)
return
}
snaps[name] = snap
}
return
}
func parseAutosnap(m interface{}, name string) (a *Autosnap, err error) {
var i struct {
Prefix string
Interval string
DatasetFilter map[string]string `mapstructure:"dataset_filter"`
}
if err = mapstructure.Decode(m, &i); err != nil {
err = fmt.Errorf("structure unfit: %s", err)
return
}
a = &Autosnap{}
if a.jobName, err = fullJobName(JobSectionAutosnap, name); err != nil {
return
}
if len(i.Prefix) < 1 {
err = fmt.Errorf("prefix must not be empty")
return
}
a.Prefix = i.Prefix
var interval time.Duration
if interval, err = parseDuration(i.Interval); err != nil {
err = fmt.Errorf("cannot parse interval: %s", err)
return
}
a.Interval = &jobrun.PeriodicRepeatStrategy{interval}
if len(i.DatasetFilter) == 0 {
err = fmt.Errorf("dataset_filter not specified")
return
}
if a.DatasetFilter, err = parseDatasetMapFilter(i.DatasetFilter, true); err != nil {
err = fmt.Errorf("cannot parse dataset filter: %s", err)
}
return
}

177
cmd/config_parse.go Normal file
View File

@ -0,0 +1,177 @@
package cmd
import (
"io/ioutil"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"
"fmt"
)
func ParseConfig(path string) (config *Config, err error) {
var i interface{}
var bytes []byte
if bytes, err = ioutil.ReadFile(path); err != nil {
err = errors.WithStack(err)
return
}
if err = yaml.Unmarshal(bytes, &i); err != nil {
err = errors.WithStack(err)
return
}
return parseConfig(i)
}
func parseConfig(i interface{}) (c *Config, err error) {
var jm map[string]map[string]interface{}
if err := mapstructure.Decode(i, &jm); err != nil {
return nil, errors.Wrap(err, "config must be a dict with job name as key and jobs as values")
}
c = &Config{
Jobs: make(map[string]Job, len(jm)),
}
for name := range jm {
c.Jobs[name], err = parseJob(name, jm[name])
if err != nil {
err = errors.Wrapf(err, "cannot parse job '%s'", name)
return nil, err
}
}
return c, nil
}
func parseJob(name string, i map[string]interface{}) (j Job, err error) {
jobtype_i, ok := i["type"]
if !ok {
err = errors.New("must have field 'type'")
return nil, err
}
jobtype_str, ok := jobtype_i.(string)
if !ok {
err = errors.New("'type' field must have type string")
return nil, err
}
switch jobtype_str {
case "pull":
return parsePullJob(name, i)
case "source":
return parseSourceJob(name, i)
case "local":
return parseLocalJob(name, i)
default:
return nil, errors.Errorf("unknown job type '%s'", jobtype_str)
}
panic("implementation error")
return nil, nil
}
func parseConnect(i map[string]interface{}) (c RPCConnecter, err error) {
type_i, ok := i["type"]
if !ok {
err = errors.New("must have field 'type'")
return
}
type_str, ok := type_i.(string)
if !ok {
err = errors.New("'type' field must have type string")
return nil, err
}
switch type_str {
case "ssh+stdinserver":
return parseSSHStdinserverConnecter(i)
default:
return nil, errors.Errorf("unknown connection type '%s'", type_str)
}
panic("implementation error")
return
}
func parseInitialReplPolicy(v interface{}, defaultPolicy InitialReplPolicy) (p InitialReplPolicy, err error) {
s, ok := v.(string)
if !ok {
goto err
}
switch {
case s == "":
p = defaultPolicy
case s == "most_recent":
p = InitialReplPolicyMostRecent
case s == "all":
p = InitialReplPolicyAll
default:
goto err
}
return
err:
err = errors.New(fmt.Sprintf("expected InitialReplPolicy, got %#v", v))
return
}
func parsePrunePolicy(v map[string]interface{}) (p PrunePolicy, err error) {
policyName, ok := v["policy"]
if !ok {
err = errors.Errorf("policy name not specified")
return
}
switch policyName {
case "grid":
return parseGridPrunePolicy(v)
default:
err = errors.Errorf("unknown policy '%s'", policyName)
return
}
panic("implementation error")
return
}
func parseAuthenticatedChannelListenerFactory(v map[string]interface{}) (p AuthenticatedChannelListenerFactory, err error) {
t, ok := v["type"]
if !ok {
err = errors.Errorf("must specify 'type' field")
return
}
s, ok := t.(string)
if !ok {
err = errors.Errorf("'type' must be a string")
return
}
switch s{
case "stdinserver":
return parseStdinserverListenerFactory(v)
default:
err = errors.Errorf("unknown type '%s'", s)
return
}
panic("implementation error")
return
}

203
cmd/config_prune_grid.go Normal file
View File

@ -0,0 +1,203 @@
package cmd
import (
"github.com/mitchellh/mapstructure"
"fmt"
"time"
. "github.com/zrepl/zrepl/util"
"github.com/pkg/errors"
"regexp"
"github.com/zrepl/zrepl/zfs"
"sort"
"strconv"
"strings"
)
type GridPrunePolicy struct {
RetentionGrid *RetentionGrid
}
type retentionGridAdaptor struct {
zfs.FilesystemVersion
}
func (a retentionGridAdaptor) Date() time.Time {
return a.Creation
}
func (a retentionGridAdaptor) LessThan(b RetentionGridEntry) bool {
return a.CreateTXG < b.(retentionGridAdaptor).CreateTXG
}
func (p *GridPrunePolicy) Prune(fs zfs.DatasetPath, versions []zfs.FilesystemVersion) (keep, remove []zfs.FilesystemVersion, err error) {
// Build adaptors for retention grid
adaptors := make([]RetentionGridEntry, len(versions))
for fsv := range versions {
adaptors[fsv] = retentionGridAdaptor{versions[fsv]}
}
sort.SliceStable(adaptors, func(i, j int) bool {
return adaptors[i].LessThan(adaptors[j])
})
now := adaptors[len(adaptors)-1].Date()
// Evaluate retention grid
keepa, removea := p.RetentionGrid.FitEntries(now, adaptors)
// Revert adaptors
keep = make([]zfs.FilesystemVersion, len(keepa))
for i := range keepa {
keep[i] = keepa[i].(retentionGridAdaptor).FilesystemVersion
}
remove = make([]zfs.FilesystemVersion, len(removea))
for i := range removea {
remove[i] = removea[i].(retentionGridAdaptor).FilesystemVersion
}
return
}
func parseGridPrunePolicy(e map[string]interface{}) (p *GridPrunePolicy, err error) {
var i struct {
Grid string
}
if err = mapstructure.Decode(e, &i); err != nil {
err = errors.Wrapf(err, "mapstructure error")
return
}
p = &GridPrunePolicy{}
// Parse grid policy
intervals, err := parseRetentionGridIntervalsString(i.Grid)
if err != nil {
err = fmt.Errorf("cannot parse retention grid: %s", err)
return
}
// Assert intervals are of increasing length (not necessarily required, but indicates config mistake)
lastDuration := time.Duration(0)
for i := range intervals {
if intervals[i].Length < lastDuration {
err = fmt.Errorf("retention grid interval length must be monotonically increasing:"+
"interval %d is shorter than %d", i+1, i)
return
} else {
lastDuration = intervals[i].Length
}
}
p.RetentionGrid = NewRetentionGrid(intervals)
return
}
var durationStringRegex *regexp.Regexp = regexp.MustCompile(`^\s*(\d+)\s*(s|m|h|d|w)\s*$`)
func parseDuration(e string) (d time.Duration, err error) {
comps := durationStringRegex.FindStringSubmatch(e)
if len(comps) != 3 {
err = fmt.Errorf("does not match regex: %s %#v", e, comps)
return
}
durationFactor, err := strconv.ParseInt(comps[1], 10, 64)
if err != nil {
return
}
var durationUnit time.Duration
switch comps[2] {
case "s":
durationUnit = time.Second
case "m":
durationUnit = time.Minute
case "h":
durationUnit = time.Hour
case "d":
durationUnit = 24 * time.Hour
case "w":
durationUnit = 24 * 7 * time.Hour
default:
err = fmt.Errorf("contains unknown time unit '%s'", comps[2])
return
}
d = time.Duration(durationFactor) * durationUnit
return
}
var retentionStringIntervalRegex *regexp.Regexp = regexp.MustCompile(`^\s*(\d+)\s*x\s*([^\(]+)\s*(\((.*)\))?\s*$`)
func parseRetentionGridIntervalString(e string) (intervals []RetentionInterval, err error) {
comps := retentionStringIntervalRegex.FindStringSubmatch(e)
if comps == nil {
err = fmt.Errorf("retention string does not match expected format")
return
}
times, err := strconv.Atoi(comps[1])
if err != nil {
return nil, err
} else if times <= 0 {
return nil, fmt.Errorf("contains factor <= 0")
}
duration, err := parseDuration(comps[2])
if err != nil {
return nil, err
}
keepCount := 1
if comps[3] != "" {
// Decompose key=value, comma separated
// For now, only keep_count is supported
re := regexp.MustCompile(`^\s*keep=(.+)\s*$`)
res := re.FindStringSubmatch(comps[4])
if res == nil || len(res) != 2 {
err = fmt.Errorf("interval parameter contains unknown parameters")
return
}
if res[1] == "all" {
keepCount = RetentionGridKeepCountAll
} else {
keepCount, err = strconv.Atoi(res[1])
if err != nil {
err = fmt.Errorf("cannot parse keep_count value")
return
}
}
}
intervals = make([]RetentionInterval, times)
for i := range intervals {
intervals[i] = RetentionInterval{
Length: duration,
KeepCount: keepCount,
}
}
return
}
func parseRetentionGridIntervalsString(s string) (intervals []RetentionInterval, err error) {
ges := strings.Split(s, "|")
intervals = make([]RetentionInterval, 0, 7*len(ges))
for intervalIdx, e := range ges {
parsed, err := parseRetentionGridIntervalString(e)
if err != nil {
return nil, fmt.Errorf("cannot parse interval %d of %d: %s: %s", intervalIdx+1, len(ges), err, strings.TrimSpace(e))
}
intervals = append(intervals, parsed...)
}
return
}

27
cmd/config_serve.go Normal file
View File

@ -0,0 +1,27 @@
package cmd
import "github.com/pkg/errors"
type StdinserverListenerFactory struct {
ClientIdentity string
}
func (StdinserverListenerFactory) Listen() AuthenticatedChannelListener {
panic("implement me")
}
func parseStdinserverListenerFactory(i map[string]interface{}) (f *StdinserverListenerFactory, err error) {
ci, ok := i["client_identity"]
if !ok {
err = errors.Errorf("must specify 'client_identity'")
return
}
cs, ok := ci.(string)
if !ok {
err = errors.Errorf("must specify 'client_identity' as string, got %T", cs)
return
}
f = &StdinserverListenerFactory{ClientIdentity: cs}
return
}

View File

@ -9,9 +9,22 @@ import (
"github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/zfs"
) )
func TestSampleConfigFileIsParsedWithoutErrors(t *testing.T) { func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) {
_, err := ParseConfig("./sampleconf/zrepl.yml")
assert.Nil(t, err) paths:= []string{
"./sampleconf/localbackup/host1.yml",
"./sampleconf/pullbackup/backuphost.yml",
"./sampleconf/pullbackup/productionhost.yml",
}
for _, p := range paths {
_, err := ParseConfig(p)
if err != nil {
t.Errorf("error parsing %s:\n%+v", p, err)
}
}
} }
func TestParseRetentionGridStringParsing(t *testing.T) { func TestParseRetentionGridStringParsing(t *testing.T) {

View File

@ -2,74 +2,56 @@ package cmd
import ( import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/zrepl/zrepl/jobrun" "fmt"
"sync"
) )
var daemonArgs struct {
noPrune bool
noAutosnap bool
noPull bool
}
// daemonCmd represents the daemon command // daemonCmd represents the daemon command
var daemonCmd = &cobra.Command{ var daemonCmd = &cobra.Command{
Use: "daemon", Use: "daemon",
Short: "run zrepl as a daemon", Short: "start daemon",
Run: doDaemon, Run: doDaemon,
} }
func init() { func init() {
RootCmd.AddCommand(daemonCmd) RootCmd.AddCommand(daemonCmd)
daemonCmd.Flags().BoolVar(&daemonArgs.noPrune, "noprune", false, "don't run prune jobs") }
daemonCmd.Flags().BoolVar(&daemonArgs.noAutosnap, "noautosnap", false, "don't run autosnap jobs")
daemonCmd.Flags().BoolVar(&daemonArgs.noPull, "nopull", false, "don't run pull jobs") type jobLogger struct {
MainLog Logger
JobName string
}
func (l jobLogger) Printf(format string, v ...interface{}) {
l.MainLog.Printf(fmt.Sprintf("[%s]: %s", l.JobName, format), v...)
}
type Job interface {
JobName() string
JobDo(log Logger) (err error)
} }
func doDaemon(cmd *cobra.Command, args []string) { func doDaemon(cmd *cobra.Command, args []string) {
r := jobrun.NewJobRunner(log) var wg sync.WaitGroup
rc := make(chan jobrun.JobEvent)
r.SetNotificationChannel(rc)
go r.Run()
if !daemonArgs.noAutosnap { log.Printf("starting jobs from config")
for name := range conf.Autosnaps {
as := conf.Autosnaps[name]
r.AddJob(as)
}
}
if !daemonArgs.noPrune { for _, job := range conf.Jobs {
for name := range conf.Prunes { log.Printf("starting job %s", job.JobName())
p := conf.Prunes[name] logger := jobLogger{log, job.JobName()}
r.AddJob(p) wg.Add(1)
} go func(j Job) {
} defer wg.Done()
err := job.JobDo(logger)
if !daemonArgs.noPull { if err != nil {
for name := range conf.Pulls { logger.Printf("returned error: %+v", err)
p := conf.Pulls[name]
r.AddJob(p)
}
}
for {
event := <-rc
// log.Printf("received event: %T", event)
switch e := (event).(type) {
case jobrun.JobFinishedEvent:
//log.Printf("[%s] job run finished after %s", e.Job.JobName(), e.Result.RunTime())
if e.Result.Error != nil {
log.Printf("[%s] exited with error: %s", e.Result.Error)
} }
case jobrun.JobScheduledEvent: }(job)
//log.Printf("[%s] scheduled to run at %s", e.Job.JobName(), e.DueAt)
case jobrun.JobrunIdleEvent:
//log.Printf("sleeping until %v", e.SleepUntil)
case jobrun.JobrunFinishedEvent:
//log.Printf("no more jobs")
break
}
} }
log.Printf("waiting for jobs from config to finish")
wg.Wait()
} }

View File

@ -1,43 +0,0 @@
package cmd
import (
"time"
"github.com/zrepl/zrepl/jobrun"
)
func (p *Pull) JobName() string {
return p.jobName
}
func (p *Pull) JobDo(log jobrun.Logger) (err error) {
return jobPull(p, log)
}
func (p *Pull) JobRepeatStrategy() jobrun.RepeatStrategy {
return p.RepeatStrategy
}
func (a *Autosnap) JobName() string {
return a.jobName
}
func (a *Autosnap) JobDo(log jobrun.Logger) (err error) {
return doAutosnap(AutosnapContext{a}, log)
}
func (a *Autosnap) JobRepeatStrategy() jobrun.RepeatStrategy {
return a.Interval
}
func (p *Prune) JobName() string {
return p.jobName
}
func (p *Prune) JobDo(log jobrun.Logger) (err error) {
return doPrune(PruneContext{p, time.Now(), false}, log)
}
func (p *Prune) JobRepeatStrategy() jobrun.RepeatStrategy {
return p.Repeat
}

View File

@ -27,7 +27,7 @@ type Logger interface {
// global state / facilities // global state / facilities
var ( var (
conf Config conf *Config
logFlags int = golog.LUTC | golog.Ldate | golog.Ltime logFlags int = golog.LUTC | golog.Ldate | golog.Ltime
logOut io.Writer logOut io.Writer
log Logger log Logger

View File

@ -1,140 +0,0 @@
package cmd
import (
"fmt"
"os"
"sort"
"time"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
)
var pruneArgs struct {
job string
dryRun bool
}
var PruneCmd = &cobra.Command{
Use: "prune",
Short: "perform pruning",
Run: cmdPrune,
}
func init() {
PruneCmd.Flags().StringVar(&pruneArgs.job, "job", "", "job to run")
PruneCmd.Flags().BoolVarP(&pruneArgs.dryRun, "dryrun", "n", false, "dry run")
RootCmd.AddCommand(PruneCmd)
}
func cmdPrune(cmd *cobra.Command, args []string) {
if len(args) < 1 {
log.Printf("must specify exactly one job as positional argument")
os.Exit(1)
}
job, ok := conf.Prunes[args[0]]
if !ok {
log.Printf("could not find prune job: %s", args[0])
os.Exit(1)
}
log.Printf("Beginning prune job:\n%s", job)
ctx := PruneContext{job, time.Now(), pruneArgs.dryRun}
err := doPrune(ctx, log)
if err != nil {
log.Printf("Prune job failed with error: %s", err)
os.Exit(1)
}
}
type PruneContext struct {
Prune *Prune
Now time.Time
DryRun bool
}
type retentionGridAdaptor struct {
zfs.FilesystemVersion
}
func (a retentionGridAdaptor) Date() time.Time {
return a.Creation
}
func (a retentionGridAdaptor) LessThan(b util.RetentionGridEntry) bool {
return a.CreateTXG < b.(retentionGridAdaptor).CreateTXG
}
func doPrune(ctx PruneContext, log Logger) error {
if ctx.DryRun {
log.Printf("doing dry run")
}
prune := ctx.Prune
// ZFSListSnapsFiltered --> todo can extend fsfilter or need new? Have already something per fs
// Dedicated snapshot object? Adaptor object to FilesystemVersion?
filesystems, err := zfs.ZFSListMapping(prune.DatasetFilter)
if err != nil {
return fmt.Errorf("error applying filesystem filter: %s", err)
}
for _, fs := range filesystems {
fsversions, err := zfs.ZFSListFilesystemVersions(fs, prune.SnapshotFilter)
if err != nil {
return fmt.Errorf("error listing filesytem versions of %s: %s", fs, err)
}
if len(fsversions) == 0 {
continue
}
adaptors := make([]util.RetentionGridEntry, len(fsversions))
for fsv := range fsversions {
adaptors[fsv] = retentionGridAdaptor{fsversions[fsv]}
}
sort.SliceStable(adaptors, func(i, j int) bool {
return adaptors[i].LessThan(adaptors[j])
})
ctx.Now = adaptors[len(adaptors)-1].Date()
describe := func(a retentionGridAdaptor) string {
timeSince := a.Creation.Sub(ctx.Now)
const day time.Duration = 24 * time.Hour
days := timeSince / day
remainder := timeSince % day
return fmt.Sprintf("%s@%dd%s from now", a.ToAbsPath(fs), days, remainder)
}
keep, remove := prune.RetentionPolicy.FitEntries(ctx.Now, adaptors)
for _, a := range remove {
r := a.(retentionGridAdaptor)
log.Printf("remove %s", describe(r))
// do echo what we'll do and exec zfs destroy if not dry run
// special handling for EBUSY (zfs hold)
// error handling for clones? just echo to cli, skip over, and exit with non-zero status code (we're idempotent)
if !ctx.DryRun {
err := zfs.ZFSDestroyFilesystemVersion(fs, r.FilesystemVersion)
if err != nil {
// handle
log.Printf("error: %s", err)
}
}
}
for _, a := range keep {
r := a.(retentionGridAdaptor)
log.Printf("would keep %s", describe(r))
}
}
return nil
}

View File

@ -3,75 +3,13 @@ package cmd
import ( import (
"fmt" "fmt"
"io" "io"
"os"
"time" "time"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/jobrun"
"github.com/zrepl/zrepl/rpc" "github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/util" "github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/zfs"
) )
var runArgs struct {
job string
once bool
}
var PushCmd = &cobra.Command{
Use: "push",
Short: "run push job (first positional argument)",
Run: cmdPush,
}
var PullCmd = &cobra.Command{
Use: "pull",
Short: "run pull job (first positional argument)",
Run: cmdPull,
}
func init() {
RootCmd.AddCommand(PushCmd)
RootCmd.AddCommand(PullCmd)
}
func cmdPush(cmd *cobra.Command, args []string) {
if len(args) != 1 {
log.Printf("must specify exactly one job as positional argument")
os.Exit(1)
}
job, ok := conf.Pushs[args[0]]
if !ok {
log.Printf("could not find push job %s", args[0])
os.Exit(1)
}
if err := jobPush(job, log); err != nil {
log.Printf("error doing push: %s", err)
os.Exit(1)
}
}
func cmdPull(cmd *cobra.Command, args []string) {
if len(args) != 1 {
log.Printf("must specify exactly one job as positional argument")
os.Exit(1)
}
job, ok := conf.Pulls[args[0]]
if !ok {
log.Printf("could not find pull job %s", args[0])
os.Exit(1)
}
if err := jobPull(job, log); err != nil {
log.Printf("error doing pull: %s", err)
os.Exit(1)
}
}
type localPullACL struct{} type localPullACL struct{}
func (a localPullACL) Filter(p *zfs.DatasetPath) (pass bool, err error) { func (a localPullACL) Filter(p *zfs.DatasetPath) (pass bool, err error) {
@ -89,47 +27,6 @@ const (
InitialReplPolicyAll InitialReplPolicy = "all" InitialReplPolicyAll InitialReplPolicy = "all"
) )
func jobPull(pull *Pull, log jobrun.Logger) (err error) {
var remote rpc.RPCClient
if remote, err = pull.From.Transport.Connect(log); err != nil {
return
}
defer closeRPCWithTimeout(log, remote, time.Second*10, "")
return doPull(PullContext{remote, log, pull.Mapping, pull.InitialReplPolicy})
}
func jobPush(push *Push, log jobrun.Logger) (err error) {
if _, ok := push.To.Transport.(LocalTransport); ok {
panic("no support for local pushs")
}
var remote rpc.RPCClient
if remote, err = push.To.Transport.Connect(log); err != nil {
return err
}
defer closeRPCWithTimeout(log, remote, time.Second*10, "")
log.Printf("building handler for PullMeRequest")
handler := Handler{
Logger: log,
PullACL: push.Filter,
SinkMappingFunc: nil, // no need for that in the handler for PullMe
}
log.Printf("handler: %#v", handler)
panic("no support for push atm")
log.Printf("push job finished")
return
}
func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration, goodbye string) { func closeRPCWithTimeout(log Logger, remote rpc.RPCClient, timeout time.Duration, goodbye string) {
log.Printf("closing rpc connection") log.Printf("closing rpc connection")

View File

@ -1,75 +0,0 @@
package cmd
import (
"fmt"
"io"
golog "log"
"os"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/sshbytestream"
)
var StdinserverCmd = &cobra.Command{
Use: "stdinserver CLIENT_IDENTITY",
Short: "start in stdin server mode (from authorized_keys file)",
Run: cmdStdinServer,
}
func init() {
RootCmd.AddCommand(StdinserverCmd)
}
func cmdStdinServer(cmd *cobra.Command, args []string) {
var err error
defer func() {
if err != nil {
log.Printf("stdinserver exiting with error: %s", err)
os.Exit(1)
}
}()
if len(args) != 1 || args[0] == "" {
err = fmt.Errorf("must specify client identity as positional argument")
return
}
identity := args[0]
pullACL, ok := conf.PullACLs[identity]
if !ok {
err = fmt.Errorf("could not find PullACL for identity '%s'", identity)
return
}
var sshByteStream io.ReadWriteCloser
if sshByteStream, err = sshbytestream.Incoming(); err != nil {
return
}
sinkMapping := func(identity string) (m DatasetMapping, err error) {
sink, ok := conf.Sinks[identity]
if !ok {
return nil, fmt.Errorf("could not find sink for identity '%s'", identity)
}
return sink, nil
}
sinkLogger := golog.New(logOut, fmt.Sprintf("sink[%s] ", identity), logFlags)
handler := Handler{
Logger: sinkLogger,
SinkMappingFunc: sinkMapping,
PullACL: pullACL,
}
server := rpc.NewServer(sshByteStream)
registerEndpoints(server, handler)
if err = server.Serve(); err != nil {
log.Printf("error serving connection: %s", err)
os.Exit(1)
}
return
}

View File

@ -47,24 +47,21 @@ func doTestDatasetMapFilter(cmd *cobra.Command, args []string) {
os.Exit(1) os.Exit(1)
} }
n, i := args[0], args[1] n, i := args[0], args[1]
jobi, err := conf.resolveJobName(n)
if err != nil { jobi, ok := conf.Jobs[n]
log.Printf("%s", err) if !ok {
log.Printf("no job %s defined in config")
os.Exit(1) os.Exit(1)
} }
var mf DatasetMapFilter var mf *DatasetMapFilter
switch j := jobi.(type) { switch j := jobi.(type) {
case *Autosnap: case *PullJob:
mf = j.DatasetFilter mf = j.Mapping
case *Prune: case *SourceJob:
mf = j.DatasetFilter mf = j.Datasets
case *Pull: case *LocalJob:
mf = j.Mapping mf = j.Mapping
case *Push:
mf = j.Filter
case DatasetMapFilter:
mf = j
default: default:
panic("incomplete implementation") panic("incomplete implementation")
} }