Remove obsolete cmd/** package + subpackages

This commit is contained in:
Christian Schwarz 2018-09-24 14:48:12 +02:00
parent 1ce0c69e4f
commit 328ac687f6
43 changed files with 0 additions and 4233 deletions

View File

@ -1,103 +0,0 @@
package cmd
import (
"context"
"fmt"
"io"
"net"
"strings"
"time"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/util"
)
type logNetConnConnecter struct {
streamrpc.Connecter
ReadDump, WriteDump string
}
var _ streamrpc.Connecter = logNetConnConnecter{}
func (l logNetConnConnecter) Connect(ctx context.Context) (net.Conn, error) {
conn, err := l.Connecter.Connect(ctx)
if err != nil {
return nil, err
}
return util.NewNetConnLogger(conn, l.ReadDump, l.WriteDump)
}
type logListenerFactory struct {
ListenerFactory
ReadDump, WriteDump string
}
var _ ListenerFactory = logListenerFactory{}
type logListener struct {
net.Listener
ReadDump, WriteDump string
}
var _ net.Listener = logListener{}
func (m logListenerFactory) Listen() (net.Listener, error) {
l, err := m.ListenerFactory.Listen()
if err != nil {
return nil, err
}
return logListener{l, m.ReadDump, m.WriteDump}, nil
}
func (l logListener) Accept() (net.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return util.NewNetConnLogger(conn, l.ReadDump, l.WriteDump)
}
type netsshAddr struct{}
func (netsshAddr) Network() string { return "netssh" }
func (netsshAddr) String() string { return "???" }
type netsshConnToNetConnAdatper struct {
io.ReadWriteCloser // works for both netssh.SSHConn and netssh.ServeConn
}
func (netsshConnToNetConnAdatper) LocalAddr() net.Addr { return netsshAddr{} }
func (netsshConnToNetConnAdatper) RemoteAddr() net.Addr { return netsshAddr{} }
func (netsshConnToNetConnAdatper) SetDeadline(t time.Time) error { return nil }
func (netsshConnToNetConnAdatper) SetReadDeadline(t time.Time) error { return nil }
func (netsshConnToNetConnAdatper) SetWriteDeadline(t time.Time) error { return nil }
type streamrpcLogAdaptor = twoClassLogAdaptor
type replicationLogAdaptor = twoClassLogAdaptor
type twoClassLogAdaptor struct {
logger.Logger
}
var _ streamrpc.Logger = twoClassLogAdaptor{}
func (a twoClassLogAdaptor) Errorf(fmtStr string, args ...interface{}) {
const errorSuffix = ": %s"
if len(args) == 1 {
if err, ok := args[0].(error); ok && strings.HasSuffix(fmtStr, errorSuffix) {
msg := strings.TrimSuffix(fmtStr, errorSuffix)
a.WithError(err).Error(msg)
return
}
}
a.Logger.Error(fmt.Sprintf(fmtStr, args...))
}
func (a twoClassLogAdaptor) Infof(fmtStr string, args ...interface{}) {
a.Logger.Info(fmt.Sprintf(fmtStr, args...))
}

View File

@ -1,192 +0,0 @@
package cmd
import (
"context"
"fmt"
"github.com/zrepl/zrepl/zfs"
"sort"
"time"
)
type IntervalAutosnap struct {
DatasetFilter zfs.DatasetFilter
Prefix string
SnapshotInterval time.Duration
}
func (a *IntervalAutosnap) filterFilesystems(ctx context.Context) (fss []*zfs.DatasetPath, stop bool) {
fss, err := zfs.ZFSListMapping(a.DatasetFilter)
stop = err != nil
if err != nil {
getLogger(ctx).WithError(err).Error("cannot list datasets")
}
if len(fss) == 0 {
getLogger(ctx).Warn("no filesystem matching filesystem filter")
}
return fss, stop
}
func (a *IntervalAutosnap) findSyncPoint(log Logger, fss []*zfs.DatasetPath) (syncPoint time.Time, err error) {
type snapTime struct {
ds *zfs.DatasetPath
time time.Time
}
if len(fss) == 0 {
return time.Now(), nil
}
snaptimes := make([]snapTime, 0, len(fss))
now := time.Now()
log.Debug("examine filesystem state")
for _, d := range fss {
l := log.WithField("fs", d.ToString())
fsvs, err := zfs.ZFSListFilesystemVersions(d, NewPrefixFilter(a.Prefix))
if err != nil {
l.WithError(err).Error("cannot list filesystem versions")
continue
}
if len(fsvs) <= 0 {
l.WithField("prefix", a.Prefix).Info("no filesystem versions with prefix")
continue
}
// Sort versions by creation
sort.SliceStable(fsvs, func(i, j int) bool {
return fsvs[i].CreateTXG < fsvs[j].CreateTXG
})
latest := fsvs[len(fsvs)-1]
l.WithField("creation", latest.Creation).
Debug("found latest snapshot")
since := now.Sub(latest.Creation)
if since < 0 {
l.WithField("snapshot", latest.Name).
WithField("creation", latest.Creation).
Error("snapshot is from the future")
continue
}
next := now
if since < a.SnapshotInterval {
next = latest.Creation.Add(a.SnapshotInterval)
}
snaptimes = append(snaptimes, snapTime{d, next})
}
if len(snaptimes) == 0 {
snaptimes = append(snaptimes, snapTime{nil, now})
}
sort.Slice(snaptimes, func(i, j int) bool {
return snaptimes[i].time.Before(snaptimes[j].time)
})
return snaptimes[0].time, nil
}
func (a *IntervalAutosnap) waitForSyncPoint(ctx context.Context, syncPoint time.Time) {
const LOG_TIME_FMT string = time.ANSIC
getLogger(ctx).
WithField("sync_point", syncPoint.Format(LOG_TIME_FMT)).
Info("wait for sync point")
select {
case <-ctx.Done():
getLogger(ctx).WithError(ctx.Err()).Info("context done")
return
case <-time.After(syncPoint.Sub(time.Now())):
}
}
func (a *IntervalAutosnap) syncUpRun(ctx context.Context, didSnaps chan struct{}) (stop bool) {
fss, stop := a.filterFilesystems(ctx)
if stop {
return true
}
syncPoint, err := a.findSyncPoint(getLogger(ctx), fss)
if err != nil {
return true
}
a.waitForSyncPoint(ctx, syncPoint)
getLogger(ctx).Debug("snapshot all filesystems to enable further snaps in lockstep")
a.doSnapshots(ctx, didSnaps)
return false
}
func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) {
log := getLogger(ctx)
if a.syncUpRun(ctx, didSnaps) {
log.Error("stoppping autosnap after error in sync up")
return
}
// task drops back to idle here
log.Debug("setting up ticker in SnapshotInterval")
ticker := time.NewTicker(a.SnapshotInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
log.WithError(ctx.Err()).Info("context done")
return
case <-ticker.C:
a.doSnapshots(ctx, didSnaps)
}
}
}
func (a *IntervalAutosnap) doSnapshots(ctx context.Context, didSnaps chan struct{}) {
log := getLogger(ctx)
// don't cache the result from previous run in case the user added
// a new dataset in the meantime
ds, stop := a.filterFilesystems(ctx)
if stop {
return
}
// TODO channel programs -> allow a little jitter?
for _, d := range ds {
suffix := time.Now().In(time.UTC).Format("20060102_150405_000")
snapname := fmt.Sprintf("%s%s", a.Prefix, suffix)
l := log.
WithField("fs", d.ToString()).
WithField("snapname", snapname)
l.Info("create snapshot")
err := zfs.ZFSSnapshot(d, snapname, false)
if err != nil {
l.WithError(err).Error("cannot create snapshot")
}
l.Info("create corresponding bookmark")
err = zfs.ZFSBookmark(d, snapname, snapname)
if err != nil {
l.WithError(err).Error("cannot create bookmark")
}
}
select {
case didSnaps <- struct{}{}:
default:
log.Error("warning: callback channel is full, discarding")
}
}

View File

@ -1,29 +0,0 @@
package cmd
import (
"fmt"
"github.com/spf13/cobra"
"os"
)
var bashcompCmd = &cobra.Command{
Use: "bashcomp path/to/out/file",
Short: "generate bash completions",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Fprintf(os.Stderr, "specify exactly one positional agument\n")
cmd.Usage()
os.Exit(1)
}
if err := RootCmd.GenBashCompletionFile(args[0]); err != nil {
fmt.Fprintf(os.Stderr, "error generating bash completion: %s", err)
os.Exit(1)
}
},
Hidden: true,
}
func init() {
RootCmd.AddCommand(bashcompCmd)
}

View File

@ -1,94 +0,0 @@
package cmd
import (
"net"
"fmt"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/zfs"
)
type Config struct {
Global Global
Jobs map[string]Job
}
func (c *Config) LookupJob(name string) (j Job, err error) {
j, ok := c.Jobs[name]
if !ok {
return nil, errors.Errorf("job '%s' is not defined", name)
}
return j, nil
}
type Global struct {
Serve struct {
Stdinserver struct {
SockDir string
}
}
Control struct {
Sockpath string
}
logging *LoggingConfig
}
type JobDebugSettings struct {
Conn struct {
ReadDump string `mapstructure:"read_dump"`
WriteDump string `mapstructure:"write_dump"`
}
RPC struct {
Log bool
}
}
type ListenerFactory interface {
Listen() (net.Listener, error)
}
type SSHStdinServerConnectDescr struct {
}
type PrunePolicy interface {
// Prune filters versions and decide which to keep and which to remove.
// Prune **does not** implement the actual removal of the versions.
Prune(fs *zfs.DatasetPath, versions []zfs.FilesystemVersion) (keep, remove []zfs.FilesystemVersion, err error)
}
type PruningJob interface {
Pruner(side PrunePolicySide, dryRun bool) (Pruner, error)
}
// A type for constants describing different prune policies of a PruningJob
// This is mostly a special-case for LocalJob, which is the only job that has two prune policies
// instead of one.
// It implements github.com/spf13/pflag.Value to be used as CLI flag for the test subcommand
type PrunePolicySide string
const (
PrunePolicySideDefault PrunePolicySide = ""
PrunePolicySideLeft PrunePolicySide = "left"
PrunePolicySideRight PrunePolicySide = "right"
)
func (s *PrunePolicySide) String() string {
return string(*s)
}
func (s *PrunePolicySide) Set(news string) error {
p := PrunePolicySide(news)
switch p {
case PrunePolicySideRight:
fallthrough
case PrunePolicySideLeft:
*s = p
default:
return errors.Errorf("must be either %s or %s", PrunePolicySideLeft, PrunePolicySideRight)
}
return nil
}
func (s *PrunePolicySide) Type() string {
return fmt.Sprintf("%s | %s", PrunePolicySideLeft, PrunePolicySideRight)
}

View File

@ -1,119 +0,0 @@
package cmd
import (
"crypto/tls"
"net"
"context"
"github.com/jinzhu/copier"
"github.com/pkg/errors"
"github.com/problame/go-netssh"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/cmd/tlsconf"
"github.com/zrepl/zrepl/config"
"time"
)
type SSHStdinserverConnecter struct {
Host string
User string
Port uint16
IdentityFile string
TransportOpenCommand []string
SSHCommand string
Options []string
dialTimeout time.Duration
}
var _ streamrpc.Connecter = &SSHStdinserverConnecter{}
func parseSSHStdinserverConnecter(in config.SSHStdinserverConnect) (c *SSHStdinserverConnecter, err error) {
c = &SSHStdinserverConnecter{
Host: in.Host,
User: in.User,
Port: in.Port,
IdentityFile: in.IdentityFile,
SSHCommand: in.SSHCommand,
Options: in.Options,
dialTimeout: in.DialTimeout,
}
return
}
type netsshConnToConn struct{ *netssh.SSHConn }
var _ net.Conn = netsshConnToConn{}
func (netsshConnToConn) SetDeadline(dl time.Time) error { return nil }
func (netsshConnToConn) SetReadDeadline(dl time.Time) error { return nil }
func (netsshConnToConn) SetWriteDeadline(dl time.Time) error { return nil }
func (c *SSHStdinserverConnecter) Connect(dialCtx context.Context) (net.Conn, error) {
var endpoint netssh.Endpoint
if err := copier.Copy(&endpoint, c); err != nil {
return nil, errors.WithStack(err)
}
dialCtx, dialCancel := context.WithTimeout(dialCtx, c.dialTimeout) // context.TODO tied to error handling below
defer dialCancel()
nconn, err := netssh.Dial(dialCtx, endpoint)
if err != nil {
if err == context.DeadlineExceeded {
err = errors.Errorf("dial_timeout of %s exceeded", c.dialTimeout)
}
return nil, err
}
return netsshConnToConn{nconn}, nil
}
type TCPConnecter struct {
Address string
dialer net.Dialer
}
func parseTCPConnecter(in config.TCPConnect) (*TCPConnecter, error) {
dialer := net.Dialer{
Timeout: in.DialTimeout,
}
return &TCPConnecter{in.Address, dialer}, nil
}
func (c *TCPConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) {
return c.dialer.DialContext(dialCtx, "tcp", c.Address)
}
type TLSConnecter struct {
Address string
dialer net.Dialer
tlsConfig *tls.Config
}
func parseTLSConnecter(in config.TLSConnect) (*TLSConnecter, error) {
dialer := net.Dialer{
Timeout: in.DialTimeout,
}
ca, err := tlsconf.ParseCAFile(in.Ca)
if err != nil {
return nil, errors.Wrap(err, "cannot parse ca file")
}
cert, err := tls.LoadX509KeyPair(in.Cert, in.Key)
if err != nil {
return nil, errors.Wrap(err, "cannot parse cert/key pair")
}
tlsConfig, err := tlsconf.ClientAuthClient(in.ServerCN, ca, cert)
if err != nil {
return nil, errors.Wrap(err, "cannot build tls config")
}
return &TLSConnecter{in.Address, dialer, tlsConfig}, nil
}
func (c *TLSConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) {
return tls.DialWithDialer(&c.dialer, "tcp", c.Address, c.tlsConfig)
}

View File

@ -1,42 +0,0 @@
package cmd
import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/zfs"
"strings"
)
type AnyFSVFilter struct{}
func (AnyFSVFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) {
return true, nil
}
type PrefixFilter struct {
prefix string
fstype zfs.VersionType
fstypeSet bool // optionals anyone?
}
func NewPrefixFilter(prefix string) *PrefixFilter {
return &PrefixFilter{prefix: prefix}
}
func NewTypedPrefixFilter(prefix string, versionType zfs.VersionType) *PrefixFilter {
return &PrefixFilter{prefix, versionType, true}
}
func parseSnapshotPrefix(i string) (p string, err error) {
if len(i) <= 0 {
err = errors.Errorf("snapshot prefix must not be empty string")
return
}
p = i
return
}
func (f *PrefixFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) {
fstypeMatches := (!f.fstypeSet || t == f.fstype)
prefixMatches := strings.HasPrefix(name, f.prefix)
return fstypeMatches && prefixMatches, nil
}

View File

@ -1,185 +0,0 @@
package cmd
import (
"time"
"context"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/zfs"
"sync"
)
type LocalJob struct {
Name string
Mapping *DatasetMapFilter
SnapshotPrefix string
Interval time.Duration
PruneLHS PrunePolicy
PruneRHS PrunePolicy
Debug JobDebugSettings
}
func parseLocalJob(c config.Global, in source.LocalJob) (j *LocalJob, err error) {
var asMap struct {
Mapping map[string]string
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
Interval string
InitialReplPolicy string `mapstructure:"initial_repl_policy"`
PruneLHS map[string]interface{} `mapstructure:"prune_lhs"`
PruneRHS map[string]interface{} `mapstructure:"prune_rhs"`
Debug map[string]interface{}
}
if err = mapstructure.Decode(i, &asMap); err != nil {
err = errors.Wrap(err, "mapstructure error")
return nil, err
}
j = &LocalJob{Name: name}
if j.Mapping, err = parseDatasetMapFilter(asMap.Mapping, false); err != nil {
return
}
if j.SnapshotPrefix, err = parseSnapshotPrefix(asMap.SnapshotPrefix); err != nil {
return
}
if j.Interval, err = parsePostitiveDuration(asMap.Interval); err != nil {
err = errors.Wrap(err, "cannot parse interval")
return
}
if j.PruneLHS, err = parsePrunePolicy(asMap.PruneLHS, true); err != nil {
err = errors.Wrap(err, "cannot parse 'prune_lhs'")
return
}
if j.PruneRHS, err = parsePrunePolicy(asMap.PruneRHS, false); err != nil {
err = errors.Wrap(err, "cannot parse 'prune_rhs'")
return
}
if err = mapstructure.Decode(asMap.Debug, &j.Debug); err != nil {
err = errors.Wrap(err, "cannot parse 'debug'")
return
}
return
}
func (j *LocalJob) JobName() string {
return j.Name
}
func (j *LocalJob) JobType() JobType { return JobTypeLocal }
func (j *LocalJob) JobStart(ctx context.Context) {
log := getLogger(ctx)
// Allow access to any dataset since we control what mapping
// is passed to the pull routine.
// All local datasets will be passed to its Map() function,
// but only those for which a mapping exists will actually be pulled.
// We can pay this small performance penalty for now.
wildcardMapFilter := NewDatasetMapFilter(1, false)
wildcardMapFilter.Add("<", "<")
sender := endpoint.NewSender(wildcardMapFilter, NewPrefixFilter(j.SnapshotPrefix))
receiver, err := endpoint.NewReceiver(j.Mapping, NewPrefixFilter(j.SnapshotPrefix))
if err != nil {
log.WithError(err).Error("unexpected error setting up local handler")
}
snapper := IntervalAutosnap{
DatasetFilter: j.Mapping.AsFilter(),
Prefix: j.SnapshotPrefix,
SnapshotInterval: j.Interval,
}
plhs, err := j.Pruner(PrunePolicySideLeft, false)
if err != nil {
log.WithError(err).Error("error creating lhs pruner")
return
}
prhs, err := j.Pruner(PrunePolicySideRight, false)
if err != nil {
log.WithError(err).Error("error creating rhs pruner")
return
}
didSnaps := make(chan struct{})
go snapper.Run(WithLogger(ctx, log.WithField(logSubsysField, "snap")), didSnaps)
outer:
for {
select {
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context")
break outer
case <-didSnaps:
log.Debug("finished taking snapshots")
log.Info("starting replication procedure")
}
{
ctx := WithLogger(ctx, log.WithField(logSubsysField, "replication"))
rep := replication.NewReplication()
rep.Drive(ctx, sender, receiver)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
plhs.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune_lhs")))
wg.Done()
}()
wg.Add(1)
go func() {
prhs.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune_rhs")))
wg.Done()
}()
wg.Wait()
}
}
func (j *LocalJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
var dsfilter zfs.DatasetFilter
var pp PrunePolicy
switch side {
case PrunePolicySideLeft:
pp = j.PruneLHS
dsfilter = j.Mapping.AsFilter()
case PrunePolicySideRight:
pp = j.PruneRHS
dsfilter, err = j.Mapping.InvertedFilter()
if err != nil {
err = errors.Wrap(err, "cannot invert mapping for prune_rhs")
return
}
default:
err = errors.Errorf("must be either left or right side")
return
}
p = Pruner{
time.Now(),
dryRun,
dsfilter,
j.SnapshotPrefix,
pp,
}
return
}

View File

@ -1,168 +0,0 @@
package cmd
import (
"os"
"os/signal"
"syscall"
"time"
"context"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication"
)
type PullJob struct {
Name string
Connect streamrpc.Connecter
Interval time.Duration
Mapping *DatasetMapFilter
// constructed from mapping during parsing
pruneFilter *DatasetMapFilter
Prune PrunePolicy
rep *replication.Replication
}
func parsePullJob(c config.Global, in config.PullJob) (j *PullJob, err error) {
j = &PullJob{Name: in.Name}
j.Connect, err = parseConnect(in.Replication.Connect)
if err != nil {
err = errors.Wrap(err, "cannot parse 'connect'")
return nil, err
}
j.Interval = in.Replication.Interval
j.Mapping = NewDatasetMapFilter(1, false)
if err := j.Mapping.Add("<", in.Replication.RootDataset); err != nil {
return nil, err
}
j.pruneFilter = NewDatasetMapFilter(1, true)
if err := j.pruneFilter.Add(in.Replication.RootDataset, MapFilterResultOk); err != nil {
return nil, err
}
if j.Prune, err = parsePrunePolicy(asMap.Prune, false); err != nil {
err = errors.Wrap(err, "cannot parse prune policy")
return
}
if in.Debug.Conn.ReadDump != "" || j.Debug.Conn.WriteDump != "" {
logConnecter := logNetConnConnecter{
Connecter: j.Connect,
ReadDump: in.Debug.Conn.ReadDump,
WriteDump: in.Debug.Conn.WriteDump,
}
j.Connect = logConnecter
}
return
}
func (j *PullJob) JobName() string { return j.Name }
func (j *PullJob) JobStart(ctx context.Context) {
log := getLogger(ctx)
defer log.Info("exiting")
// j.task is idle here idle here
usr1 := make(chan os.Signal)
signal.Notify(usr1, syscall.SIGUSR1)
defer signal.Stop(usr1)
ticker := time.NewTicker(j.Interval)
for {
begin := time.Now()
j.doRun(ctx)
duration := time.Now().Sub(begin)
if duration > j.Interval {
log.
WithField("actual_duration", duration).
WithField("configured_interval", j.Interval).
Warn("pull run took longer than configured interval")
}
select {
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context")
return
case <-ticker.C:
case <-usr1:
}
}
}
var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurability
RxHeaderMaxLen: 4096,
RxStructuredMaxLen: 4096 * 4096,
RxStreamMaxChunkSize: 4096 * 4096,
TxChunkSize: 4096 * 4096,
RxTimeout: streamrpc.Timeout{
Progress: 10 * time.Second,
},
TxTimeout: streamrpc.Timeout{
Progress: 10 * time.Second,
},
}
func (j *PullJob) doRun(ctx context.Context) {
log := getLogger(ctx)
// FIXME
clientConf := &streamrpc.ClientConfig{
ConnConfig: STREAMRPC_CONFIG,
}
client, err := streamrpc.NewClient(j.Connect, clientConf)
defer client.Close()
sender := endpoint.NewRemote(client)
receiver, err := endpoint.NewReceiver(j.Mapping, AnyFSVFilter{})
if err != nil {
log.WithError(err).Error("error creating receiver endpoint")
return
}
{
ctx := replication.WithLogger(ctx, replicationLogAdaptor{log.WithField(logSubsysField, "replication")})
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(logSubsysField, "rpc")})
ctx = endpoint.WithLogger(ctx, log.WithField(logSubsysField, "endpoint"))
j.rep = replication.NewReplication()
j.rep.Drive(ctx, sender, receiver)
}
client.Close()
{
ctx := WithLogger(ctx, log.WithField(logSubsysField, "prune"))
pruner, err := j.Pruner(PrunePolicySideDefault, false)
if err != nil {
log.WithError(err).Error("error creating pruner")
} else {
pruner.Run(ctx)
}
}
}
func (j *PullJob) Report() *replication.Report {
return j.rep.Report()
}
func (j *PullJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
p = Pruner{
time.Now(),
dryRun,
j.pruneFilter,
j.Prune,
}
return
}

View File

@ -1,172 +0,0 @@
package cmd
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/endpoint"
"net"
)
type SourceJob struct {
Name string
Serve ListenerFactory
Filesystems *DatasetMapFilter
SnapshotPrefix string
Interval time.Duration
Prune PrunePolicy
}
func parseSourceJob(c config.Global, in config.SourceJob) (j *SourceJob, err error) {
j = &SourceJob{
Name: in.Name,
Interval: in.Snapshotting.Interval,
}
if j.Serve, err = parseAuthenticatedChannelListenerFactory(c, in.Replication.Serve); err != nil {
return
}
if j.Filesystems, err = parseDatasetMapFilter(in.Replication.Filesystems, true); err != nil {
return
}
if j.SnapshotPrefix, err = parseSnapshotPrefix(in.Snapshotting.SnapshotPrefix); err != nil {
return
}
if j.Prune, err = parsePrunePolicy(in.Pruning, true); err != nil {
err = errors.Wrap(err, "cannot parse 'prune'")
return
}
if in.Debug.Conn.ReadDump != "" || in.Debug.Conn.WriteDump != "" {
logServe := logListenerFactory{
ListenerFactory: j.Serve,
ReadDump: in.Debug.Conn.ReadDump,
WriteDump: in.Debug.Conn.WriteDump,
}
j.Serve = logServe
}
return
}
func (j *SourceJob) JobName() string {
return j.Name
}
func (j *SourceJob) JobType() JobType { return JobTypeSource }
func (j *SourceJob) JobStart(ctx context.Context) {
log := getLogger(ctx)
defer log.Info("exiting")
a := IntervalAutosnap{j.Filesystems, j.SnapshotPrefix, j.Interval}
p, err := j.Pruner(PrunePolicySideDefault, false)
if err != nil {
log.WithError(err).Error("error creating pruner")
return
}
didSnaps := make(chan struct{})
go j.serve(ctx) // logSubsysField set by handleConnection
go a.Run(WithLogger(ctx, log.WithField(logSubsysField, "snap")), didSnaps)
outer:
for {
select {
case <-ctx.Done():
break outer
case <-didSnaps:
p.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune")))
}
}
log.WithError(ctx.Err()).Info("context")
}
func (j *SourceJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) {
p = Pruner{
time.Now(),
dryRun,
j.Filesystems,
j.SnapshotPrefix,
j.Prune,
}
return
}
func (j *SourceJob) serve(ctx context.Context) {
log := getLogger(ctx)
listener, err := j.Serve.Listen()
if err != nil {
getLogger(ctx).WithError(err).Error("error listening")
return
}
type connChanMsg struct {
conn net.Conn
err error
}
connChan := make(chan connChanMsg, 1)
// Serve connections until interrupted or error
outer:
for {
go func() {
rwc, err := listener.Accept()
connChan <- connChanMsg{rwc, err}
}()
select {
case rwcMsg := <-connChan:
if rwcMsg.err != nil {
log.WithError(rwcMsg.err).Error("error accepting connection")
continue
}
j.handleConnection(ctx, rwcMsg.conn)
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context")
break outer
}
}
log.Info("closing listener")
err = listener.Close()
if err != nil {
log.WithError(err).Error("error closing listener")
}
return
}
func (j *SourceJob) handleConnection(ctx context.Context, conn net.Conn) {
log := getLogger(ctx)
log.Info("handling client connection")
senderEP := endpoint.NewSender(j.Filesystems, NewPrefixFilter(j.SnapshotPrefix))
ctx = endpoint.WithLogger(ctx, log.WithField(logSubsysField, "serve"))
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(logSubsysField, "rpc")})
handler := endpoint.NewHandler(senderEP)
if err := streamrpc.ServeConn(ctx, conn, STREAMRPC_CONFIG, handler.Handle); err != nil {
log.WithError(err).Error("error serving connection")
} else {
log.Info("client closed connection")
}
}

View File

@ -1,194 +0,0 @@
package cmd
import (
"crypto/tls"
"crypto/x509"
"github.com/mattn/go-isatty"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cmd/tlsconf"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/logger"
"os"
)
type LoggingConfig struct {
Outlets *logger.Outlets
}
type MetadataFlags int64
const (
MetadataTime MetadataFlags = 1 << iota
MetadataLevel
MetadataNone MetadataFlags = 0
MetadataAll MetadataFlags = ^0
)
func parseLogging(in []config.LoggingOutletEnum) (c *LoggingConfig, err error) {
c = &LoggingConfig{}
c.Outlets = logger.NewOutlets()
if len(in) == 0 {
// Default config
out := WriterOutlet{&HumanFormatter{}, os.Stdout}
c.Outlets.Add(out, logger.Warn)
return
}
var syslogOutlets, stdoutOutlets int
for lei, le := range in {
outlet, minLevel, err := parseOutlet(le)
if err != nil {
return nil, errors.Wrapf(err, "cannot parse outlet #%d", lei)
}
var _ logger.Outlet = WriterOutlet{}
var _ logger.Outlet = &SyslogOutlet{}
switch outlet.(type) {
case *SyslogOutlet:
syslogOutlets++
case WriterOutlet:
stdoutOutlets++
}
c.Outlets.Add(outlet, minLevel)
}
if syslogOutlets > 1 {
return nil, errors.Errorf("can only define one 'syslog' outlet")
}
if stdoutOutlets > 1 {
return nil, errors.Errorf("can only define one 'stdout' outlet")
}
return c, nil
}
func parseLogFormat(i interface{}) (f EntryFormatter, err error) {
var is string
switch j := i.(type) {
case string:
is = j
default:
return nil, errors.Errorf("invalid log format: wrong type: %T", i)
}
switch is {
case "human":
return &HumanFormatter{}, nil
case "logfmt":
return &LogfmtFormatter{}, nil
case "json":
return &JSONFormatter{}, nil
default:
return nil, errors.Errorf("invalid log format: '%s'", is)
}
}
func parseOutlet(in config.LoggingOutletEnum) (o logger.Outlet, level logger.Level, err error) {
parseCommon := func(common config.LoggingOutletCommon) (logger.Level, EntryFormatter, error) {
if common.Level == "" || common.Format == "" {
return 0, nil, errors.Errorf("must specify 'level' and 'format' field")
}
minLevel, err := logger.ParseLevel(common.Level)
if err != nil {
return 0, nil, errors.Wrap(err, "cannot parse 'level' field")
}
formatter, err := parseLogFormat(common.Format)
if err != nil {
return 0, nil, errors.Wrap(err, "cannot parse 'formatter' field")
}
return minLevel, formatter, nil
}
var f EntryFormatter
switch v := in.Ret.(type) {
case config.StdoutLoggingOutlet:
level, f, err = parseCommon(v.LoggingOutletCommon)
if err != nil {
break
}
o, err = parseStdoutOutlet(v, f)
case config.TCPLoggingOutlet:
level, f, err = parseCommon(v.LoggingOutletCommon)
if err != nil {
break
}
o, err = parseTCPOutlet(v, f)
case config.SyslogLoggingOutlet:
level, f, err = parseCommon(v.LoggingOutletCommon)
if err != nil {
break
}
o, err = parseSyslogOutlet(v, f)
default:
panic(v)
}
return o, level, err
}
func parseStdoutOutlet(in config.StdoutLoggingOutlet, formatter EntryFormatter) (WriterOutlet, error) {
flags := MetadataAll
writer := os.Stdout
if !isatty.IsTerminal(writer.Fd()) && !in.Time {
flags &= ^MetadataTime
}
formatter.SetMetadataFlags(flags)
return WriterOutlet{
formatter,
os.Stdout,
}, nil
}
func parseTCPOutlet(in config.TCPLoggingOutlet, formatter EntryFormatter) (out *TCPOutlet, err error) {
var tlsConfig *tls.Config
if in.TLS != nil {
tlsConfig, err = func(m *config.TCPLoggingOutletTLS, host string) (*tls.Config, error) {
clientCert, err := tls.LoadX509KeyPair(m.Cert, m.Key)
if err != nil {
return nil, errors.Wrap(err, "cannot load client cert")
}
var rootCAs *x509.CertPool
if m.CA == "" {
if rootCAs, err = x509.SystemCertPool(); err != nil {
return nil, errors.Wrap(err, "cannot open system cert pool")
}
} else {
rootCAs, err = tlsconf.ParseCAFile(m.CA)
if err != nil {
return nil, errors.Wrap(err, "cannot parse CA cert")
}
}
if rootCAs == nil {
panic("invariant violated")
}
return tlsconf.ClientAuthClient(host, rootCAs, clientCert)
}(in.TLS, in.Address)
if err != nil {
return nil, errors.New("cannot not parse TLS config in field 'tls'")
}
}
formatter.SetMetadataFlags(MetadataAll)
return NewTCPOutlet(formatter, in.Net, in.Address, tlsConfig, in.RetryInterval), nil
}
func parseSyslogOutlet(in config.SyslogLoggingOutlet, formatter EntryFormatter) (out *SyslogOutlet, err error) {
out = &SyslogOutlet{}
out.Formatter = formatter
out.Formatter.SetMetadataFlags(MetadataNone)
out.RetryInterval = in.RetryInterval
return out, nil
}

View File

@ -1,274 +0,0 @@
package cmd
import (
"fmt"
"strings"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/zfs"
)
type DatasetMapFilter struct {
entries []datasetMapFilterEntry
// if set, only valid filter entries can be added using Add()
// and Map() will always return an error
filterMode bool
}
type datasetMapFilterEntry struct {
path *zfs.DatasetPath
// the mapping. since this datastructure acts as both mapping and filter
// we have to convert it to the desired rep dynamically
mapping string
subtreeMatch bool
}
func NewDatasetMapFilter(capacity int, filterMode bool) *DatasetMapFilter {
return &DatasetMapFilter{
entries: make([]datasetMapFilterEntry, 0, capacity),
filterMode: filterMode,
}
}
func (m *DatasetMapFilter) Add(pathPattern, mapping string) (err error) {
if m.filterMode {
if _, err = m.parseDatasetFilterResult(mapping); err != nil {
return
}
}
// assert path glob adheres to spec
const SUBTREE_PATTERN string = "<"
patternCount := strings.Count(pathPattern, SUBTREE_PATTERN)
switch {
case patternCount > 1:
case patternCount == 1 && !strings.HasSuffix(pathPattern, SUBTREE_PATTERN):
err = fmt.Errorf("pattern invalid: only one '<' at end of string allowed")
return
}
pathStr := strings.TrimSuffix(pathPattern, SUBTREE_PATTERN)
path, err := zfs.NewDatasetPath(pathStr)
if err != nil {
return fmt.Errorf("pattern is not a dataset path: %s", err)
}
entry := datasetMapFilterEntry{
path: path,
mapping: mapping,
subtreeMatch: patternCount > 0,
}
m.entries = append(m.entries, entry)
return
}
// find the most specific prefix mapping we have
//
// longer prefix wins over shorter prefix, direct wins over glob
func (m DatasetMapFilter) mostSpecificPrefixMapping(path *zfs.DatasetPath) (idx int, found bool) {
lcp, lcp_entry_idx := -1, -1
direct_idx := -1
for e := range m.entries {
entry := m.entries[e]
ep := m.entries[e].path
lep := ep.Length()
switch {
case !entry.subtreeMatch && ep.Equal(path):
direct_idx = e
continue
case entry.subtreeMatch && path.HasPrefix(ep) && lep > lcp:
lcp = lep
lcp_entry_idx = e
default:
continue
}
}
if lcp_entry_idx >= 0 || direct_idx >= 0 {
found = true
switch {
case direct_idx >= 0:
idx = direct_idx
case lcp_entry_idx >= 0:
idx = lcp_entry_idx
}
}
return
}
// Returns target == nil if there is no mapping
func (m DatasetMapFilter) Map(source *zfs.DatasetPath) (target *zfs.DatasetPath, err error) {
if m.filterMode {
err = fmt.Errorf("using a filter for mapping simply does not work")
return
}
mi, hasMapping := m.mostSpecificPrefixMapping(source)
if !hasMapping {
return nil, nil
}
me := m.entries[mi]
if me.mapping == "" {
// Special case treatment: 'foo/bar<' => ''
if !me.subtreeMatch {
return nil, fmt.Errorf("mapping to '' must be a subtree match")
}
// ok...
} else {
if strings.HasPrefix("!", me.mapping) {
// reject mapping
return nil, nil
}
}
target, err = zfs.NewDatasetPath(me.mapping)
if err != nil {
err = fmt.Errorf("mapping target is not a dataset path: %s", err)
return
}
if me.subtreeMatch {
// strip common prefix ('<' wildcards are no special case here)
extendComps := source.Copy()
extendComps.TrimPrefix(me.path)
target.Extend(extendComps)
}
return
}
func (m DatasetMapFilter) Filter(p *zfs.DatasetPath) (pass bool, err error) {
if !m.filterMode {
err = fmt.Errorf("using a mapping as a filter does not work")
return
}
mi, hasMapping := m.mostSpecificPrefixMapping(p)
if !hasMapping {
pass = false
return
}
me := m.entries[mi]
pass, err = m.parseDatasetFilterResult(me.mapping)
return
}
// Construct a new filter-only DatasetMapFilter from a mapping
// The new filter allows excactly those paths that were not forbidden by the mapping.
func (m DatasetMapFilter) InvertedFilter() (inv *DatasetMapFilter, err error) {
if m.filterMode {
err = errors.Errorf("can only invert mappings")
return
}
inv = &DatasetMapFilter{
make([]datasetMapFilterEntry, len(m.entries)),
true,
}
for i, e := range m.entries {
inv.entries[i].path, err = zfs.NewDatasetPath(e.mapping)
if err != nil {
err = errors.Wrapf(err, "mapping cannot be inverted: '%s' is not a dataset path: %s", e.mapping)
return
}
inv.entries[i].mapping = MapFilterResultOk
inv.entries[i].subtreeMatch = e.subtreeMatch
}
return inv, nil
}
// FIXME investigate whether we can support more...
func (m DatasetMapFilter) Invert() (endpoint.FSMap, error) {
if m.filterMode {
return nil, errors.Errorf("can only invert mappings")
}
if len(m.entries) != 1 {
return nil, errors.Errorf("inversion of complicated mappings is not implemented") // FIXME
}
e := m.entries[0]
inv := &DatasetMapFilter{
make([]datasetMapFilterEntry, len(m.entries)),
false,
}
mp, err := zfs.NewDatasetPath(e.mapping)
if err != nil {
return nil, err
}
inv.entries[0] = datasetMapFilterEntry{
path: mp,
mapping: e.path.ToString(),
subtreeMatch: e.subtreeMatch,
}
return inv, nil
}
// Creates a new DatasetMapFilter in filter mode from a mapping
// All accepting mapping results are mapped to accepting filter results
// All rejecting mapping results are mapped to rejecting filter results
func (m DatasetMapFilter) AsFilter() endpoint.FSFilter {
f := &DatasetMapFilter{
make([]datasetMapFilterEntry, len(m.entries)),
true,
}
for i, e := range m.entries {
var newe datasetMapFilterEntry = e
if strings.HasPrefix(newe.mapping, "!") {
newe.mapping = MapFilterResultOmit
} else {
newe.mapping = MapFilterResultOk
}
f.entries[i] = newe
}
return f
}
const (
MapFilterResultOk string = "ok"
MapFilterResultOmit string = "!"
)
// Parse a dataset filter result
func (m DatasetMapFilter) parseDatasetFilterResult(result string) (pass bool, err error) {
l := strings.ToLower(result)
if l == MapFilterResultOk {
return true, nil
}
if l == MapFilterResultOmit {
return false, nil
}
return false, fmt.Errorf("'%s' is not a valid filter result", result)
}
func parseDatasetMapFilterFilesystems(in map[string]bool) (f *DatasetMapFilter, err error) {
f = NewDatasetMapFilter(len(in), true)
for pathPattern, accept := range in {
mapping := MapFilterResultOmit
if accept {
mapping = MapFilterResultOk
}
if err = f.Add(pathPattern, mapping); err != nil {
err = fmt.Errorf("invalid mapping entry ['%s':'%s']: %s", pathPattern, mapping, err)
return
}
}
return
}

View File

@ -1,211 +0,0 @@
package cmd
import (
"io/ioutil"
"fmt"
"github.com/go-yaml/yaml"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/cmd/pruning/retentiongrid"
"github.com/zrepl/zrepl/config"
"os"
)
var ConfigFileDefaultLocations []string = []string{
"/etc/zrepl/zrepl.yml",
"/usr/local/etc/zrepl/zrepl.yml",
}
const (
JobNameControl string = "control"
)
var ReservedJobNames []string = []string{
JobNameControl,
}
type ConfigParsingContext struct {
Global *Global
}
func ParseConfig(path string) (config *Config, err error) {
if path == "" {
// Try default locations
for _, l := range ConfigFileDefaultLocations {
stat, err := os.Stat(l)
if err != nil {
continue
}
if !stat.Mode().IsRegular() {
err = errors.Errorf("file at default location is not a regular file: %s", l)
continue
}
path = l
break
}
}
var i interface{}
var bytes []byte
if bytes, err = ioutil.ReadFile(path); err != nil {
err = errors.WithStack(err)
return
}
if err = yaml.Unmarshal(bytes, &i); err != nil {
err = errors.WithStack(err)
return
}
return parseConfig(i)
}
func parseConfig(i interface{}) (c *Config, err error) {
var asMap struct {
Global map[string]interface{}
Jobs []map[string]interface{}
}
if err := mapstructure.Decode(i, &asMap); err != nil {
return nil, errors.Wrap(err, "config root must be a dict")
}
c = &Config{}
// Parse global with defaults
c.Global.Serve.Stdinserver.SockDir = "/var/run/zrepl/stdinserver"
c.Global.Control.Sockpath = "/var/run/zrepl/control"
err = mapstructure.Decode(asMap.Global, &c.Global)
if err != nil {
err = errors.Wrap(err, "mapstructure error on 'global' section: %s")
return
}
if c.Global.logging, err = parseLogging(asMap.Global["logging"]); err != nil {
return nil, errors.Wrap(err, "cannot parse logging section")
}
cpc := ConfigParsingContext{&c.Global}
jpc := JobParsingContext{cpc}
c.Jobs = make(map[string]Job, len(asMap.Jobs))
// FIXME internal jobs should not be mixed with user jobs
// Monitoring Jobs
var monJobs []map[string]interface{}
if err := mapstructure.Decode(asMap.Global["monitoring"], &monJobs); err != nil {
return nil, errors.Wrap(err, "cannot parse monitoring section")
}
for i, jc := range monJobs {
if jc["name"] == "" || jc["name"] == nil {
// FIXME internal jobs should not require a name...
jc["name"] = fmt.Sprintf("prometheus-%d", i)
}
job, err := parseJob(jpc, jc)
if err != nil {
return nil, errors.Wrapf(err, "cannot parse monitoring job #%d", i)
}
if job.JobType() != JobTypePrometheus {
return nil, errors.Errorf("monitoring job #%d has invalid job type", i)
}
c.Jobs[job.JobName()] = job
}
// Regular Jobs
for i := range asMap.Jobs {
job, err := parseJob(jpc, asMap.Jobs[i])
if err != nil {
// Try to find its name
namei, ok := asMap.Jobs[i]["name"]
if !ok {
namei = fmt.Sprintf("<no name, entry #%d in list>", i)
}
err = errors.Wrapf(err, "cannot parse job '%v'", namei)
return nil, err
}
jn := job.JobName()
if _, ok := c.Jobs[jn]; ok {
err = errors.Errorf("duplicate or invalid job name: %s", jn)
return nil, err
}
c.Jobs[job.JobName()] = job
}
return c, nil
}
type JobParsingContext struct {
ConfigParsingContext
}
func parseJob(c config.Global, in config.JobEnum) (j Job, err error) {
switch v := in.Ret.(type) {
case config.PullJob:
return parsePullJob(c, v)
case config.SourceJob:
return parseSourceJob(c, v)
case config.LocalJob:
return parseLocalJob(c, v)
default:
panic(fmt.Sprintf("implementation error: unknown job type %s", v))
}
}
func parseConnect(in config.ConnectEnum) (c streamrpc.Connecter, err error) {
switch v := in.Ret.(type) {
case config.SSHStdinserverConnect:
return parseSSHStdinserverConnecter(v)
case config.TCPConnect:
return parseTCPConnecter(v)
case config.TLSConnect:
return parseTLSConnecter(v)
default:
panic(fmt.Sprintf("unknown connect type %v", v))
}
}
func parsePruning(in []config.PruningEnum, willSeeBookmarks bool) (p Pruner, err error) {
policies := make([]PrunePolicy, len(in))
for i := range in {
if policies[i], err = parseKeepRule(in[i]); err != nil {
return nil, errors.Wrapf(err, "invalid keep rule #%d:", i)
}
}
}
func parseKeepRule(in config.PruningEnum) (p PrunePolicy, err error) {
switch v := in.Ret.(type) {
case config.PruneGrid:
return retentiongrid.ParseGridPrunePolicy(v, willSeeBookmarks)
//case config.PruneKeepLastN:
//case config.PruneKeepRegex:
//case config.PruneKeepNotReplicated:
default:
panic(fmt.Sprintf("unknown keep rule type %v", v))
}
}
func parseAuthenticatedChannelListenerFactory(c config.Global, in config.ServeEnum) (p ListenerFactory, err error) {
switch v := in.Ret.(type) {
case config.StdinserverServer:
return parseStdinserverListenerFactory(c, v)
case config.TCPServe:
return parseTCPListenerFactory(c, v)
case config.TLSServe:
return parseTLSListenerFactory(c, v)
default:
panic(fmt.Sprintf("unknown listener type %v", v))
}
}

View File

@ -1,11 +0,0 @@
package cmd
import "github.com/zrepl/zrepl/zfs"
type NoPrunePolicy struct{}
func (p NoPrunePolicy) Prune(fs *zfs.DatasetPath, versions []zfs.FilesystemVersion) (keep, remove []zfs.FilesystemVersion, err error) {
keep = versions
remove = []zfs.FilesystemVersion{}
return
}

View File

@ -1,58 +0,0 @@
package cmd
import (
"github.com/problame/go-netssh"
"github.com/zrepl/zrepl/cmd/helpers"
"github.com/zrepl/zrepl/config"
"net"
"path"
)
type StdinserverListenerFactory struct {
ClientIdentity string
sockpath string
}
func parseStdinserverListenerFactory(c config.Global, in config.StdinserverServer) (f *StdinserverListenerFactory, err error) {
f = &StdinserverListenerFactory{
ClientIdentity: in.ClientIdentity,
}
f.sockpath = path.Join(c.Serve.StdinServer.SockDir, f.ClientIdentity)
return
}
func (f *StdinserverListenerFactory) Listen() (net.Listener, error) {
if err := helpers.PreparePrivateSockpath(f.sockpath); err != nil {
return nil, err
}
l, err := netssh.Listen(f.sockpath)
if err != nil {
return nil, err
}
return StdinserverListener{l}, nil
}
type StdinserverListener struct {
l *netssh.Listener
}
func (l StdinserverListener) Addr() net.Addr {
return netsshAddr{}
}
func (l StdinserverListener) Accept() (net.Conn, error) {
c, err := l.l.Accept()
if err != nil {
return nil, err
}
return netsshConnToNetConnAdatper{c}, nil
}
func (l StdinserverListener) Close() (err error) {
return l.l.Close()
}

View File

@ -1,25 +0,0 @@
package cmd
import (
"github.com/zrepl/zrepl/config"
"net"
"time"
)
type TCPListenerFactory struct {
Address string
}
func parseTCPListenerFactory(c config.Global, in config.TCPServe) (*TCPListenerFactory, error) {
lf := &TCPListenerFactory{
Address: in.Listen,
}
return lf, nil
}
var TCPListenerHandshakeTimeout = 10 * time.Second // FIXME make configurable
func (f *TCPListenerFactory) Listen() (net.Listener, error) {
return net.Listen("tcp", f.Address)
}

View File

@ -1,78 +0,0 @@
package cmd
import (
"crypto/tls"
"crypto/x509"
"net"
"time"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cmd/tlsconf"
"github.com/zrepl/zrepl/config"
)
type TCPListenerFactory struct {
Address string
tls bool
clientCA *x509.CertPool
serverCert tls.Certificate
clientCommonName string
}
func parseTCPListenerFactory(c config.Global, in config.TCPServe) (*TCPListenerFactory, error) {
lf := &TCPListenerFactory{
Address: in.Listen,
}
if in.TLS != nil {
err := func(i map[string]interface{}) (err error) {
var in struct {
CA string
Cert string
Key string
ClientCN string `mapstructure:"client_cn"`
}
if err := mapstructure.Decode(i, &in); err != nil {
return errors.Wrap(err, "mapstructure error")
}
if in.CA == "" || in.Cert == "" || in.Key == "" || in.ClientCN == "" {
return errors.New("fields 'ca', 'cert', 'key' and 'client_cn' must be specified")
}
lf.clientCommonName = in.ClientCN
lf.clientCA, err = tlsconf.ParseCAFile(in.CA)
if err != nil {
return errors.Wrap(err, "cannot parse ca file")
}
lf.serverCert, err = tls.LoadX509KeyPair(in.Cert, in.Key)
if err != nil {
return errors.Wrap(err, "cannot parse cer/key pair")
}
lf.tls = true // mark success
return nil
}(in.TLS)
if err != nil {
return nil, errors.Wrap(err, "error parsing TLS config in field 'tls'")
}
}
return lf, nil
}
var TCPListenerHandshakeTimeout = 10 * time.Second // FIXME make configurable
func (f *TCPListenerFactory) Listen() (net.Listener, error) {
l, err := net.Listen("tcp", f.Address)
if !f.tls || err != nil {
return l, err
}
tl := tlsconf.NewClientAuthListener(l, f.clientCA, f.serverCert, f.clientCommonName, TCPListenerHandshakeTimeout)
return tl, nil
}

View File

@ -1,283 +0,0 @@
package cmd
import (
"testing"
"time"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/zrepl/zrepl/util"
"github.com/zrepl/zrepl/zfs"
)
func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) {
paths := []string{
"./sampleconf/localbackup/host1.yml",
"./sampleconf/pullbackup/backuphost.yml",
"./sampleconf/pullbackup/productionhost.yml",
"./sampleconf/random/debugging.yml",
"./sampleconf/random/logging_and_monitoring.yml",
}
for _, p := range paths {
c, err := ParseConfig(p)
if err != nil {
t.Errorf("error parsing %s:\n%+v", p, err)
}
t.Logf("file: %s", p)
t.Log(pretty.Sprint(c))
}
}
func TestParseRetentionGridStringParsing(t *testing.T) {
intervals, err := parseRetentionGridIntervalsString("2x10m(keep=2) | 1x1h | 3x1w")
assert.Nil(t, err)
assert.Len(t, intervals, 6)
proto := util.RetentionInterval{
KeepCount: 2,
Length: 10 * time.Minute,
}
assert.EqualValues(t, proto, intervals[0])
assert.EqualValues(t, proto, intervals[1])
proto.KeepCount = 1
proto.Length = 1 * time.Hour
assert.EqualValues(t, proto, intervals[2])
proto.Length = 7 * 24 * time.Hour
assert.EqualValues(t, proto, intervals[3])
assert.EqualValues(t, proto, intervals[4])
assert.EqualValues(t, proto, intervals[5])
intervals, err = parseRetentionGridIntervalsString("|")
assert.Error(t, err)
intervals, err = parseRetentionGridIntervalsString("2x10m")
assert.NoError(t, err)
intervals, err = parseRetentionGridIntervalsString("1x10m(keep=all)")
assert.NoError(t, err)
assert.Len(t, intervals, 1)
assert.EqualValues(t, util.RetentionGridKeepCountAll, intervals[0].KeepCount)
}
func TestDatasetMapFilter(t *testing.T) {
expectMapping := func(m map[string]string, from, to string) {
dmf, err := parseDatasetMapFilter(m, false)
if err != nil {
t.Logf("expect test map to be valid: %s", err)
t.FailNow()
}
fromPath, err := zfs.NewDatasetPath(from)
if err != nil {
t.Logf("expect test from path to be valid: %s", err)
t.FailNow()
}
res, err := dmf.Map(fromPath)
if to == "" {
assert.Nil(t, res)
assert.Nil(t, err)
t.Logf("%s => NOT MAPPED", fromPath.ToString())
return
}
assert.Nil(t, err)
toPath, err := zfs.NewDatasetPath(to)
if err != nil {
t.Logf("expect test to path to be valid: %s", err)
t.FailNow()
}
assert.True(t, res.Equal(toPath))
}
expectFilter := func(m map[string]string, path string, pass bool) {
dmf, err := parseDatasetMapFilter(m, true)
if err != nil {
t.Logf("expect test filter to be valid: %s", err)
t.FailNow()
}
p, err := zfs.NewDatasetPath(path)
if err != nil {
t.Logf("expect test path to be valid: %s", err)
t.FailNow()
}
res, err := dmf.Filter(p)
assert.Nil(t, err)
assert.Equal(t, pass, res)
}
map1 := map[string]string{
"a/b/c<": "root1",
"a/b<": "root2",
"<": "root3/b/c",
"b": "!",
"a/b/c/d/e<": "!",
"q<": "root4/1/2",
}
expectMapping(map1, "a/b/c", "root1")
expectMapping(map1, "a/b/c/d", "root1/d")
expectMapping(map1, "a/b/c/d/e", "")
expectMapping(map1, "a/b/e", "root2/e")
expectMapping(map1, "a/b", "root2")
expectMapping(map1, "x", "root3/b/c/x")
expectMapping(map1, "x/y", "root3/b/c/x/y")
expectMapping(map1, "q", "root4/1/2")
expectMapping(map1, "b", "")
expectMapping(map1, "q/r", "root4/1/2/r")
map2 := map[string]string{ // identity mapping
"<": "",
}
expectMapping(map2, "foo/bar", "foo/bar")
map3 := map[string]string{ // subtree to local mapping, need that for Invert()
"foo/bar<": "",
}
{
m, _ := parseDatasetMapFilter(map3, false)
p, _ := zfs.NewDatasetPath("foo/bar")
tp, err := m.Map(p)
assert.Nil(t, err)
assert.True(t, tp.Empty())
expectMapping(map3, "foo/bar/x", "x")
expectMapping(map3, "x", "")
}
filter1 := map[string]string{
"<": "!",
"a<": "ok",
"a/b<": "!",
}
expectFilter(filter1, "b", false)
expectFilter(filter1, "a", true)
expectFilter(filter1, "a/d", true)
expectFilter(filter1, "a/b", false)
expectFilter(filter1, "a/b/c", false)
filter2 := map[string]string{}
expectFilter(filter2, "foo", false) // default to omit
}
func TestDatasetMapFilter_AsFilter(t *testing.T) {
mapspec := map[string]string{
"a/b/c<": "root1",
"a/b<": "root2",
"<": "root3/b/c",
"b": "!",
"a/b/c/d/e<": "!",
"q<": "root4/1/2",
}
m, err := parseDatasetMapFilter(mapspec, false)
assert.Nil(t, err)
f := m.AsFilter()
t.Logf("Mapping:\n%s\nFilter:\n%s", pretty.Sprint(m), pretty.Sprint(f))
tf := func(f zfs.DatasetFilter, path string, pass bool) {
p, err := zfs.NewDatasetPath(path)
assert.Nil(t, err)
r, err := f.Filter(p)
assert.Nil(t, err)
assert.Equal(t, pass, r)
}
tf(f, "a/b/c", true)
tf(f, "a/b", true)
tf(f, "b", false)
tf(f, "a/b/c/d/e", false)
tf(f, "a/b/c/d/e/f", false)
tf(f, "a", true)
}
func TestDatasetMapFilter_InvertedFilter(t *testing.T) {
mapspec := map[string]string{
"a/b": "1/2",
"a/b/c<": "3",
"a/b/c/d<": "1/2/a",
"a/b/d": "!",
}
m, err := parseDatasetMapFilter(mapspec, false)
assert.Nil(t, err)
inv, err := m.InvertedFilter()
assert.Nil(t, err)
t.Log(pretty.Sprint(inv))
expectMapping := func(m *DatasetMapFilter, ps string, expRes bool) {
p, err := zfs.NewDatasetPath(ps)
assert.Nil(t, err)
r, err := m.Filter(p)
assert.Nil(t, err)
assert.Equal(t, expRes, r)
}
expectMapping(inv, "4", false)
expectMapping(inv, "3", true)
expectMapping(inv, "3/x", true)
expectMapping(inv, "1", false)
expectMapping(inv, "1/2", true)
expectMapping(inv, "1/2/3", false)
expectMapping(inv, "1/2/a/b", true)
}
func TestDatasetMapFilter_Invert(t *testing.T) {
mapspec := map[string]string{
"<": "foo/bar",
}
m, err := parseDatasetMapFilter(mapspec, false)
assert.NoError(t, err)
invI, err := m.Invert()
assert.NoError(t, err)
inv, ok := invI.(*DatasetMapFilter)
assert.True(t, ok)
expectMapping := func(m *DatasetMapFilter, input, expect string, expErr bool, expEmpty bool) {
p, err := zfs.NewDatasetPath(input)
assert.Nil(t, err)
r, err := m.Map(p)
if expErr {
assert.Nil(t, r)
assert.Error(t, err)
return
}
if expEmpty {
assert.Nil(t, err)
assert.True(t, r.Empty())
} else if expect == "" {
assert.Nil(t, r)
assert.Nil(t, err)
} else {
assert.Nil(t, err)
assert.NotNil(t, r)
assert.Equal(t, expect, r.ToString())
}
}
expectMapping(inv, "x", "", false, false)
expectMapping(inv, "foo/bar", "", false, true)
expectMapping(inv, "foo/bar/bee", "bee", false, false)
}

View File

@ -1,156 +0,0 @@
package cmd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/cmd/daemon"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version"
"io"
golog "log"
"net"
"net/http"
"os"
)
var controlCmd = &cobra.Command{
Use: "control",
Short: "control zrepl daemon",
}
var pprofCmd = &cobra.Command{
Use: "pprof off | [on TCP_LISTEN_ADDRESS]",
Short: "start a http server exposing go-tool-compatible profiling endpoints at TCP_LISTEN_ADDRESS",
Run: doControlPProf,
PreRunE: func(cmd *cobra.Command, args []string) error {
if cmd.Flags().NArg() < 1 {
goto enargs
}
switch cmd.Flags().Arg(0) {
case "on":
pprofCmdArgs.msg.Run = true
if cmd.Flags().NArg() != 2 {
return errors.New("must specify TCP_LISTEN_ADDRESS as second positional argument")
}
pprofCmdArgs.msg.HttpListenAddress = cmd.Flags().Arg(1)
case "off":
if cmd.Flags().NArg() != 1 {
goto enargs
}
pprofCmdArgs.msg.Run = false
}
return nil
enargs:
return errors.New("invalid number of positional arguments")
},
}
var pprofCmdArgs struct {
msg daemon.PprofServerControlMsg
}
var controlVersionCmd = &cobra.Command{
Use: "version",
Short: "print version of running zrepl daemon",
Run: doControLVersionCmd,
}
var controlStatusCmdArgs struct {
format string
level logger.Level
onlyShowJob string
}
func init() {
RootCmd.AddCommand(controlCmd)
controlCmd.AddCommand(pprofCmd)
controlCmd.AddCommand(controlVersionCmd)
controlStatusCmdArgs.level = logger.Warn
}
func controlHttpClient() (client http.Client, err error) {
conf, err := ParseConfig(rootArgs.configFile)
if err != nil {
return http.Client{}, err
}
return http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", conf.Global.Control.Sockpath)
},
},
}, nil
}
func doControlPProf(cmd *cobra.Command, args []string) {
log := golog.New(os.Stderr, "", 0)
die := func() {
log.Printf("exiting after error")
os.Exit(1)
}
log.Printf("connecting to zrepl daemon")
httpc, err := controlHttpClient()
if err != nil {
log.Printf("error parsing config: %s", err)
die()
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&pprofCmdArgs.msg); err != nil {
log.Printf("error marshaling request: %s", err)
die()
}
_, err = httpc.Post("http://unix"+daemon.ControlJobEndpointPProf, "application/json", &buf)
if err != nil {
log.Printf("error: %s", err)
die()
}
log.Printf("finished")
}
func doControLVersionCmd(cmd *cobra.Command, args []string) {
log := golog.New(os.Stderr, "", 0)
die := func() {
log.Printf("exiting after error")
os.Exit(1)
}
httpc, err := controlHttpClient()
if err != nil {
log.Printf("could not connect to daemon: %s", err)
die()
}
resp, err := httpc.Get("http://unix" + daemon.ControlJobEndpointVersion)
if err != nil {
log.Printf("error: %s", err)
die()
} else if resp.StatusCode != http.StatusOK {
var msg bytes.Buffer
io.CopyN(&msg, resp.Body, 4096)
log.Printf("error: %s", msg.String())
die()
}
var info version.ZreplVersionInformation
err = json.NewDecoder(resp.Body).Decode(&info)
if err != nil {
log.Printf("error unmarshaling response: %s", err)
die()
}
fmt.Println(info.String())
}

View File

@ -1,142 +0,0 @@
package daemon
import (
"bytes"
"context"
"encoding/json"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cmd/daemon/job"
"github.com/zrepl/zrepl/cmd/helpers"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version"
"io"
"net"
"net/http"
)
type controlJob struct {
sockaddr *net.UnixAddr
jobs *jobs
}
func newControlJob(sockpath string, jobs *jobs) (j *controlJob, err error) {
j = &controlJob{jobs: jobs}
j.sockaddr, err = net.ResolveUnixAddr("unix", sockpath)
if err != nil {
err = errors.Wrap(err, "cannot resolve unix address")
return
}
return
}
func (j *controlJob) Name() string { return jobNameControl }
func (j *controlJob) Status() interface{} { return nil }
const (
ControlJobEndpointPProf string = "/debug/pprof"
ControlJobEndpointVersion string = "/version"
ControlJobEndpointStatus string = "/status"
)
func (j *controlJob) Run(ctx context.Context) {
log := job.GetLogger(ctx)
defer log.Info("control job finished")
l, err := helpers.ListenUnixPrivate(j.sockaddr)
if err != nil {
log.WithError(err).Error("error listening")
return
}
pprofServer := NewPProfServer(ctx)
mux := http.NewServeMux()
mux.Handle(ControlJobEndpointPProf, requestLogger{log: log, handlerFunc: func(w http.ResponseWriter, r *http.Request) {
var msg PprofServerControlMsg
err := json.NewDecoder(r.Body).Decode(&msg)
if err != nil {
log.WithError(err).Error("bad pprof request from client")
w.WriteHeader(http.StatusBadRequest)
}
pprofServer.Control(msg)
w.WriteHeader(200)
}})
mux.Handle(ControlJobEndpointVersion,
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
return version.NewZreplVersionInformation(), nil
}}})
mux.Handle(ControlJobEndpointStatus,
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
s := j.jobs.status()
return s, nil
}}})
server := http.Server{Handler: mux}
outer:
for {
served := make(chan error)
go func() {
served <- server.Serve(l)
close(served)
}()
select {
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context done")
server.Shutdown(context.Background())
break outer
case err = <-served:
if err != nil {
log.WithError(err).Error("error serving")
break outer
}
}
}
}
type jsonResponder struct {
producer func() (interface{}, error)
}
func (j jsonResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
res, err := j.producer()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, err.Error())
return
}
var buf bytes.Buffer
err = json.NewEncoder(&buf).Encode(res)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, err.Error())
} else {
io.Copy(w, &buf)
}
}
type requestLogger struct {
log logger.Logger
handler http.Handler
handlerFunc http.HandlerFunc
}
func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log := l.log.WithField("method", r.Method).WithField("url", r.URL)
log.Info("start")
if l.handlerFunc != nil {
l.handlerFunc(w, r)
} else if l.handler != nil {
l.handler.ServeHTTP(w, r)
} else {
log.Error("no handler or handlerFunc configured")
}
log.Info("finish")
}

View File

@ -1,174 +0,0 @@
package daemon
import (
"context"
"github.com/zrepl/zrepl/logger"
"os"
"os/signal"
"syscall"
"time"
"github.com/zrepl/zrepl/version"
"fmt"
)
.daesdfadsfsafjlsjfda
import (
"context"
"fmt"
"github.com/zrepl/zrepl/cmd/daemon/job"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
)
func Run(ctx context.Context, controlSockpath string, outlets *logger.Outlets, confJobs []job.Job) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()
log := logger.NewLogger(outlets, 1*time.Second)
log.Info(version.NewZreplVersionInformation().String())
// parse config
for _, job := range confJobs {
if IsInternalJobName(job.Name()) {
panic(fmt.Sprintf("internal job name used for config job '%s'", job.Name())) //FIXME
}
}
ctx = job.WithLogger(ctx, log)
jobs := newJobs()
// start control socket
controlJob, err := newControlJob(controlSockpath, jobs)
if err != nil {
panic(err) // FIXME
}
jobs.start(ctx, controlJob, true)
// start prometheus
//var promJob *prometheusJob // FIXME
//jobs.start(ctx, promJob, true)
log.Info("starting daemon")
// start regular jobs
for _, j := range confJobs {
jobs.start(ctx, j, false)
}
select {
case <-jobs.wait():
log.Info("all jobs finished")
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context finished")
}
log.Info("daemon exiting")
}
type jobs struct {
wg sync.WaitGroup
// m protects all fields below it
m sync.RWMutex
wakeups map[string]job.WakeupChan // by JobName
jobs map[string]job.Job
}
func newJobs() *jobs {
return &jobs{
wakeups: make(map[string]job.WakeupChan),
jobs: make(map[string]job.Job),
}
}
const (
logJobField string = "job"
logTaskField string = "task"
logSubsysField string = "subsystem"
)
func (s *jobs) wait() <-chan struct{} {
ch := make(chan struct{})
go func() {
s.wg.Wait()
}()
return ch
}
func (s *jobs) status() map[string]interface{} {
s.m.RLock()
defer s.m.RUnlock()
type res struct {
name string
status interface{}
}
var wg sync.WaitGroup
c := make(chan res, len(s.jobs))
for name, j := range s.jobs {
wg.Add(1)
go func(name string, j job.Job) {
defer wg.Done()
c <- res{name: name, status: j.Status()}
}(name, j)
}
wg.Wait()
close(c)
ret := make(map[string]interface{}, len(s.jobs))
for res := range c {
ret[res.name] = res.status
}
return ret
}
const (
jobNamePrometheus = "_prometheus"
jobNameControl = "_control"
)
func IsInternalJobName(s string) bool {
return strings.HasPrefix(s, "_")
}
func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
s.m.Lock()
defer s.m.Unlock()
jobLog := job.GetLogger(ctx).WithField(logJobField, j.Name())
jobName := j.Name()
if !internal && IsInternalJobName(jobName) {
panic(fmt.Sprintf("internal job name used for non-internal job %s", jobName))
}
if internal && !IsInternalJobName(jobName) {
panic(fmt.Sprintf("internal job does not use internal job name %s", jobName))
}
if _, ok := s.jobs[jobName]; ok {
panic(fmt.Sprintf("duplicate job name %s", jobName))
}
s.jobs[jobName] = j
ctx = job.WithLogger(ctx, jobLog)
ctx, wakeupChan := job.WithWakeup(ctx)
s.wakeups[jobName] = wakeupChan
s.wg.Add(1)
go func() {
defer s.wg.Done()
jobLog.Info("starting job")
defer jobLog.Info("job exited")
j.Run(ctx)
}()
}

View File

@ -1,47 +0,0 @@
package job
import (
"context"
"github.com/zrepl/zrepl/logger"
)
type Logger = logger.Logger
type contextKey int
const (
contextKeyLog contextKey = iota
contextKeyWakeup
)
func GetLogger(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLog).(Logger); ok {
return l
}
return logger.NewNullLogger()
}
func WithLogger(ctx context.Context, l Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, l)
}
func WithWakeup(ctx context.Context) (context.Context, WakeupChan) {
wc := make(chan struct{}, 1)
return context.WithValue(ctx, contextKeyWakeup, wc), wc
}
type Job interface {
Name() string
Run(ctx context.Context)
Status() interface{}
}
type WakeupChan <-chan struct{}
func WaitWakeup(ctx context.Context) WakeupChan {
wc, ok := ctx.Value(contextKeyWakeup).(WakeupChan)
if !ok {
wc = make(chan struct{})
}
return wc
}

View File

@ -1,80 +0,0 @@
package daemon
import (
"net/http"
// FIXME: importing this package has the side-effect of poisoning the http.DefaultServeMux
// FIXME: with the /debug/pprof endpoints
"context"
"net"
"net/http/pprof"
)
type PProfServer struct {
cc chan PprofServerControlMsg
state PprofServerControlMsg
listener net.Listener
}
type PprofServerControlMsg struct {
// Whether the server should listen for requests on the given address
Run bool
// Must be set if Run is true, undefined otherwise
HttpListenAddress string
}
func NewPProfServer(ctx context.Context) *PProfServer {
s := &PProfServer{
cc: make(chan PprofServerControlMsg),
}
go s.controlLoop(ctx)
return s
}
func (s *PProfServer) controlLoop(ctx context.Context) {
outer:
for {
var msg PprofServerControlMsg
select {
case <-ctx.Done():
if s.listener != nil {
s.listener.Close()
}
break outer
case msg = <-s.cc:
// proceed
}
var err error
if msg.Run && s.listener == nil {
s.listener, err = net.Listen("tcp", msg.HttpListenAddress)
if err != nil {
s.listener = nil
continue
}
// FIXME: because net/http/pprof does not provide a mux,
mux := http.NewServeMux()
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
go http.Serve(s.listener, mux)
continue
}
if !msg.Run && s.listener != nil {
s.listener.Close()
s.listener = nil
continue
}
}
}
func (s *PProfServer) Control(msg PprofServerControlMsg) {
s.cc <- msg
}

View File

@ -1,82 +0,0 @@
package daemon
import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/zrepl/zrepl/cmd/daemon/job"
"github.com/zrepl/zrepl/zfs"
"net"
"net/http"
)
type prometheusJob struct {
listen string
}
func newPrometheusJob(listen string) *prometheusJob {
return &prometheusJob{listen}
}
var prom struct {
taskLastActiveStart *prometheus.GaugeVec
taskLastActiveDuration *prometheus.GaugeVec
taskLogEntries *prometheus.CounterVec
}
func init() {
prom.taskLastActiveStart = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "zrepl",
Subsystem: "daemon",
Name: "task_last_active_start",
Help: "point in time at which the job task last left idle state",
}, []string{"zrepl_job", "job_type", "task"})
prom.taskLastActiveDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "zrepl",
Subsystem: "daemon",
Name: "task_last_active_duration",
Help: "seconds that the last run ob a job task spent between leaving and re-entering idle state",
}, []string{"zrepl_job", "job_type", "task"})
prom.taskLogEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "zrepl",
Subsystem: "daemon",
Name: "task_log_entries",
Help: "number of log entries per job task and level",
}, []string{"zrepl_job", "job_type", "task", "level"})
prometheus.MustRegister(prom.taskLastActiveStart)
prometheus.MustRegister(prom.taskLastActiveDuration)
prometheus.MustRegister(prom.taskLogEntries)
}
func (j *prometheusJob) Name() string { return jobNamePrometheus }
func (j *prometheusJob) Status() interface{} { return nil }
func (j *prometheusJob) Run(ctx context.Context) {
if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil {
panic(err)
}
log := job.GetLogger(ctx)
l, err := net.Listen("tcp", j.listen)
if err != nil {
log.WithError(err).Error("cannot listen")
}
go func() {
select {
case <-ctx.Done():
l.Close()
}
}()
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
err = http.Serve(l, mux)
if err != nil {
log.WithError(err).Error("error while serving")
}
}

View File

@ -1,176 +0,0 @@
package cmd
import (
"context"
"fmt"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/cmd/daemon"
"github.com/zrepl/zrepl/cmd/daemon/job"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/logger"
"os"
"os/signal"
"syscall"
"time"
)
// daemonCmd represents the daemon command
var daemonCmd = &cobra.Command{
Use: "daemon",
Short: "start daemon",
Run: doDaemon,
}
func init() {
RootCmd.AddCommand(daemonCmd)
}
type Job interface {
JobName() string
JobType() JobType
JobStart(ctxt context.Context)
}
type JobType string
const (
JobTypePull JobType = "pull"
JobTypeSource JobType = "source"
JobTypeLocal JobType = "local"
JobTypePrometheus JobType = "prometheus"
JobTypeControl JobType = "control"
)
func ParseUserJobType(s string) (JobType, error) {
switch s {
case "pull":
return JobTypePull, nil
case "source":
return JobTypeSource, nil
case "local":
return JobTypeLocal, nil
case "prometheus":
return JobTypePrometheus, nil
}
return "", fmt.Errorf("unknown job type '%s'", s)
}
func (j JobType) String() string {
return string(j)
}
type daemonJobAdaptor struct {
j Job
}
func (a daemonJobAdaptor) Name() string {
return a.j.JobName()
}
func (a daemonJobAdaptor) Run(ctx context.Context) {
a.j.JobStart(ctx)
}
func (a daemonJobAdaptor) Status() interface{} { return nil }
func doDaemon(cmd *cobra.Command, args []string) {
conf, err := config.ParseConfig(rootArgs.configFile)
if err != nil {
fmt.Fprintf(os.Stderr, "error parsing config: %s\n", err)
os.Exit(1)
}
outlets, err := parseLogging(conf.Global.Logging)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to generate logger: %s\n", err)
return
}
log := logger.NewLogger(outlets.Outlets, 1*time.Second)
ctx := WithLogger(context.Background(), log)
daemonJobs := make([]job.Job, 0, len(conf.Jobs))
for i := range conf.Jobs {
parseJob()
daemonJobs = append(daemonJobs, daemonJobAdaptor{conf.Jobs[i]})
}
daemon.Run(ctx, conf.Global.Control.Sockpath, conf.Global.logging.Outlets, daemonJobs)
}
type contextKey string
const (
contextKeyLog contextKey = contextKey("log")
contextKeyDaemon contextKey = contextKey("daemon")
)
func getLogger(ctx context.Context) Logger {
return ctx.Value(contextKeyLog).(Logger)
}
func WithLogger(ctx context.Context, l Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, l)
}
type Daemon struct {
conf *Config
startedAt time.Time
}
func NewDaemon(initialConf *Config) *Daemon {
return &Daemon{conf: initialConf}
}
func (d *Daemon) Loop(ctx context.Context) {
d.startedAt = time.Now()
log := getLogger(ctx)
ctx, cancel := context.WithCancel(ctx)
ctx = context.WithValue(ctx, contextKeyDaemon, d)
sigChan := make(chan os.Signal, 1)
finishs := make(chan Job)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
log.Info("starting jobs from config")
i := 0
for _, job := range d.conf.Jobs {
logger := log.WithField(logJobField, job.JobName())
logger.Info("starting")
i++
jobCtx := WithLogger(ctx, logger)
go func(j Job) {
j.JobStart(jobCtx)
finishs <- j
}(job)
}
finishCount := 0
outer:
for {
select {
case <-finishs:
finishCount++
if finishCount == len(d.conf.Jobs) {
log.Info("all jobs finished")
break outer
}
case sig := <-sigChan:
log.WithField("signal", sig).Info("received signal")
log.Info("cancelling all jobs")
cancel()
}
}
signal.Stop(sigChan)
cancel() // make go vet happy
log.Info("exiting")
}

View File

@ -1,201 +0,0 @@
package cmd
import (
"bytes"
"encoding/json"
"fmt"
"github.com/go-logfmt/logfmt"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/logger"
"time"
)
type EntryFormatter interface {
SetMetadataFlags(flags MetadataFlags)
Format(e *logger.Entry) ([]byte, error)
}
const (
FieldLevel = "level"
FieldMessage = "msg"
FieldTime = "time"
)
const (
logJobField string = "job"
logTaskField string = "task"
logSubsysField string = "subsystem"
)
type NoFormatter struct{}
func (f NoFormatter) SetMetadataFlags(flags MetadataFlags) {}
func (f NoFormatter) Format(e *logger.Entry) ([]byte, error) {
return []byte(e.Message), nil
}
type HumanFormatter struct {
metadataFlags MetadataFlags
ignoreFields map[string]bool
}
const HumanFormatterDateFormat = time.RFC3339
func (f *HumanFormatter) SetMetadataFlags(flags MetadataFlags) {
f.metadataFlags = flags
}
func (f *HumanFormatter) SetIgnoreFields(ignore []string) {
if ignore == nil {
f.ignoreFields = nil
return
}
f.ignoreFields = make(map[string]bool, len(ignore))
for _, field := range ignore {
f.ignoreFields[field] = true
}
}
func (f *HumanFormatter) ignored(field string) bool {
return f.ignoreFields != nil && f.ignoreFields[field]
}
func (f *HumanFormatter) Format(e *logger.Entry) (out []byte, err error) {
var line bytes.Buffer
if f.metadataFlags&MetadataTime != 0 {
fmt.Fprintf(&line, "%s ", e.Time.Format(HumanFormatterDateFormat))
}
if f.metadataFlags&MetadataLevel != 0 {
fmt.Fprintf(&line, "[%s]", e.Level.Short())
}
prefixFields := []string{logJobField, logTaskField, logSubsysField}
prefixed := make(map[string]bool, len(prefixFields)+2)
for _, field := range prefixFields {
val, ok := e.Fields[field].(string)
if !ok {
continue
}
if !f.ignored(field) {
fmt.Fprintf(&line, "[%s]", val)
prefixed[field] = true
}
}
if line.Len() > 0 {
fmt.Fprint(&line, ": ")
}
fmt.Fprint(&line, e.Message)
if len(e.Fields)-len(prefixed) > 0 {
fmt.Fprint(&line, " ")
enc := logfmt.NewEncoder(&line)
for field, value := range e.Fields {
if prefixed[field] || f.ignored(field) {
continue
}
if err := logfmtTryEncodeKeyval(enc, field, value); err != nil {
return nil, err
}
}
}
return line.Bytes(), nil
}
type JSONFormatter struct {
metadataFlags MetadataFlags
}
func (f *JSONFormatter) SetMetadataFlags(flags MetadataFlags) {
f.metadataFlags = flags
}
func (f *JSONFormatter) Format(e *logger.Entry) ([]byte, error) {
data := make(logger.Fields, len(e.Fields)+3)
for k, v := range e.Fields {
switch v := v.(type) {
case error:
// Otherwise errors are ignored by `encoding/json`
// https://github.com/sirupsen/logrus/issues/137
data[k] = v.Error()
default:
_, err := json.Marshal(v)
if err != nil {
return nil, errors.Errorf("field is not JSON encodable: %s", k)
}
data[k] = v
}
}
data[FieldMessage] = e.Message
data[FieldTime] = e.Time.Format(time.RFC3339)
data[FieldLevel] = e.Level
return json.Marshal(data)
}
type LogfmtFormatter struct {
metadataFlags MetadataFlags
}
func (f *LogfmtFormatter) SetMetadataFlags(flags MetadataFlags) {
f.metadataFlags = flags
}
func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) {
var buf bytes.Buffer
enc := logfmt.NewEncoder(&buf)
if f.metadataFlags&MetadataTime != 0 {
enc.EncodeKeyval(FieldTime, e.Time)
}
if f.metadataFlags&MetadataLevel != 0 {
enc.EncodeKeyval(FieldLevel, e.Level)
}
// at least try and put job and task in front
prefixed := make(map[string]bool, 2)
prefix := []string{logJobField, logTaskField, logSubsysField}
for _, pf := range prefix {
v, ok := e.Fields[pf]
if !ok {
break
}
if err := logfmtTryEncodeKeyval(enc, pf, v); err != nil {
return nil, err // unlikely
}
prefixed[pf] = true
}
enc.EncodeKeyval(FieldMessage, e.Message)
for k, v := range e.Fields {
if !prefixed[k] {
if err := logfmtTryEncodeKeyval(enc, k, v); err != nil {
return nil, err
}
}
}
return buf.Bytes(), nil
}
func logfmtTryEncodeKeyval(enc *logfmt.Encoder, field, value interface{}) error {
err := enc.EncodeKeyval(field, value)
switch err {
case nil: // ok
return nil
case logfmt.ErrUnsupportedValueType:
enc.EncodeKeyval(field, fmt.Sprintf("<%T>", value))
return nil
}
return errors.Wrapf(err, "cannot encode field '%s'", field)
}

View File

@ -1,161 +0,0 @@
package cmd
import (
"bytes"
"context"
"crypto/tls"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/logger"
"io"
"log/syslog"
"net"
"time"
)
type WriterOutlet struct {
Formatter EntryFormatter
Writer io.Writer
}
func (h WriterOutlet) WriteEntry(entry logger.Entry) error {
bytes, err := h.Formatter.Format(&entry)
if err != nil {
return err
}
_, err = h.Writer.Write(bytes)
h.Writer.Write([]byte("\n"))
return err
}
type TCPOutlet struct {
formatter EntryFormatter
// Specifies how much time must pass between a connection error and a reconnection attempt
// Log entries written to the outlet during this time interval are silently dropped.
connect func(ctx context.Context) (net.Conn, error)
entryChan chan *bytes.Buffer
}
func NewTCPOutlet(formatter EntryFormatter, network, address string, tlsConfig *tls.Config, retryInterval time.Duration) *TCPOutlet {
connect := func(ctx context.Context) (conn net.Conn, err error) {
deadl, ok := ctx.Deadline()
if !ok {
deadl = time.Time{}
}
dialer := net.Dialer{
Deadline: deadl,
}
if tlsConfig != nil {
conn, err = tls.DialWithDialer(&dialer, network, address, tlsConfig)
} else {
conn, err = dialer.DialContext(ctx, network, address)
}
return
}
entryChan := make(chan *bytes.Buffer, 1) // allow one message in flight while previos is in io.Copy()
o := &TCPOutlet{
formatter: formatter,
connect: connect,
entryChan: entryChan,
}
go o.outLoop(retryInterval)
return o
}
// FIXME: use this method
func (h *TCPOutlet) Close() {
close(h.entryChan)
}
func (h *TCPOutlet) outLoop(retryInterval time.Duration) {
var retry time.Time
var conn net.Conn
for msg := range h.entryChan {
var err error
for conn == nil {
time.Sleep(time.Until(retry))
ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(retryInterval))
conn, err = h.connect(ctx)
cancel()
if err != nil {
retry = time.Now().Add(retryInterval)
conn = nil
}
}
conn.SetWriteDeadline(time.Now().Add(retryInterval))
_, err = io.Copy(conn, msg)
if err != nil {
retry = time.Now().Add(retryInterval)
conn.Close()
conn = nil
}
}
}
func (h *TCPOutlet) WriteEntry(e logger.Entry) error {
ebytes, err := h.formatter.Format(&e)
if err != nil {
return err
}
buf := new(bytes.Buffer)
buf.Write(ebytes)
buf.WriteString("\n")
select {
case h.entryChan <- buf:
return nil
default:
return errors.New("connection broken or not fast enough")
}
}
type SyslogOutlet struct {
Formatter EntryFormatter
RetryInterval time.Duration
writer *syslog.Writer
lastConnectAttempt time.Time
}
func (o *SyslogOutlet) WriteEntry(entry logger.Entry) error {
bytes, err := o.Formatter.Format(&entry)
if err != nil {
return err
}
s := string(bytes)
if o.writer == nil {
now := time.Now()
if now.Sub(o.lastConnectAttempt) < o.RetryInterval {
return nil // not an error toward logger
}
o.writer, err = syslog.New(syslog.LOG_LOCAL0, "zrepl")
o.lastConnectAttempt = time.Now()
if err != nil {
o.writer = nil
return err
}
}
switch entry.Level {
case logger.Debug:
return o.writer.Debug(s)
case logger.Info:
return o.writer.Info(s)
case logger.Warn:
return o.writer.Warning(s)
case logger.Error:
return o.writer.Err(s)
default:
return o.writer.Err(s) // write as error as reaching this case is in fact an error
}
}

View File

@ -1,43 +0,0 @@
// zrepl replicates ZFS filesystems & volumes between pools
//
// Code Organization
//
// The cmd package uses github.com/spf13/cobra for its CLI.
//
// It combines the other packages in the zrepl project to implement zrepl functionality.
//
// Each subcommand's code is in the corresponding *.go file.
// All other *.go files contain code shared by the subcommands.
package cmd
import (
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/logger"
)
//
//type Logger interface {
// Printf(format string, v ...interface{})
//}
type Logger = logger.Logger
var RootCmd = &cobra.Command{
Use: "zrepl",
Short: "ZFS dataset replication",
Long: `Replicate ZFS filesystems & volumes between pools:
- push & pull mode
- automatic snapshot creation & pruning
- local / over the network
- ACLs instead of blank SSH access`,
}
var rootArgs struct {
configFile string
}
func init() {
//cobra.OnInitialize(initConfig)
RootCmd.PersistentFlags().StringVar(&rootArgs.configFile, "config", "", "config file path")
}

View File

@ -1,123 +0,0 @@
package cmd
import (
"context"
"fmt"
"github.com/zrepl/zrepl/zfs"
"time"
)
type Pruner struct {
Now time.Time
DryRun bool
DatasetFilter zfs.DatasetFilter
policies []PrunePolicy
}
type PruneResult struct {
Filesystem *zfs.DatasetPath
All []zfs.FilesystemVersion
Keep []zfs.FilesystemVersion
Remove []zfs.FilesystemVersion
}
func (p *Pruner) filterFilesystems(ctx context.Context) (filesystems []*zfs.DatasetPath, stop bool) {
filesystems, err := zfs.ZFSListMapping(p.DatasetFilter)
if err != nil {
getLogger(ctx).WithError(err).Error("error applying filesystem filter")
return nil, true
}
if len(filesystems) <= 0 {
getLogger(ctx).Info("no filesystems matching filter")
return nil, true
}
return filesystems, false
}
func (p *Pruner) filterVersions(ctx context.Context, fs *zfs.DatasetPath) (fsversions []zfs.FilesystemVersion, stop bool) {
log := getLogger(ctx).WithField("fs", fs.ToString())
filter := AnyFSVFilter{}
fsversions, err := zfs.ZFSListFilesystemVersions(fs, filter)
if err != nil {
log.WithError(err).Error("error listing filesytem versions")
return nil, true
}
if len(fsversions) == 0 {
log.Info("no filesystem versions matching prefix")
return nil, true
}
return fsversions, false
}
func (p *Pruner) pruneFilesystem(ctx context.Context, fs *zfs.DatasetPath) (r PruneResult, valid bool) {
log := getLogger(ctx).WithField("fs", fs.ToString())
fsversions, stop := p.filterVersions(ctx, fs)
if stop {
return
}
keep, remove, err := p.PrunePolicy.Prune(fs, fsversions)
if err != nil {
log.WithError(err).Error("error evaluating prune policy")
return
}
log.WithField("fsversions", fsversions).
WithField("keep", keep).
WithField("remove", remove).
Debug("prune policy debug dump")
r = PruneResult{fs, fsversions, keep, remove}
makeFields := func(v zfs.FilesystemVersion) (fields map[string]interface{}) {
fields = make(map[string]interface{})
fields["version"] = v.ToAbsPath(fs)
timeSince := v.Creation.Sub(p.Now)
fields["age_ns"] = timeSince
const day time.Duration = 24 * time.Hour
days := timeSince / day
remainder := timeSince % day
fields["age_str"] = fmt.Sprintf("%dd%s", days, remainder)
return
}
for _, v := range remove {
fields := makeFields(v)
log.WithFields(fields).Info("destroying version")
// echo what we'll do and exec zfs destroy if not dry run
// TODO special handling for EBUSY (zfs hold)
// TODO error handling for clones? just echo to cli, skip over, and exit with non-zero status code (we're idempotent)
if !p.DryRun {
err := zfs.ZFSDestroyFilesystemVersion(fs, v)
if err != nil {
log.WithFields(fields).WithError(err).Error("error destroying version")
}
}
}
return r, true
}
func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) {
if p.DryRun {
getLogger(ctx).Info("doing dry run")
}
filesystems, stop := p.filterFilesystems(ctx)
if stop {
return
}
r = make([]PruneResult, 0, len(filesystems))
for _, fs := range filesystems {
res, ok := p.pruneFilesystem(ctx, fs)
if ok {
r = append(r, res)
}
}
return
}

View File

@ -1,28 +0,0 @@
jobs:
- name: mirror_local
type: local
# snapshot the filesystems matched by the left-hand-side of the mapping
# every 10m with zrepl_ as prefix
mapping: {
"zroot/var/db<": "storage/backups/local/zroot/var/db",
"zroot/usr/home<": "storage/backups/local/zroot/usr/home",
"zroot/usr/home/paranoid": "!", #don't backup paranoid user
"zroot/poudriere/ports<": "!", #don't backup the ports trees
}
snapshot_prefix: zrepl_
interval: 10m
# keep one hour of 10m interval snapshots of filesystems matched by
# the left-hand-side of the mapping
prune_lhs:
policy: grid
grid: 1x1h(keep=all)
keep_bookmarks: all
# follow a grandfathering scheme for filesystems on the right-hand-side of the mapping
prune_rhs:
policy: grid
grid: 1x1h(keep=all) | 24x1h | 35x1d | 6x30d

View File

@ -1,26 +0,0 @@
jobs:
- name: fullbackup_prod1
type: pull
# connect to remote using ssh / stdinserver command
connect:
type: ssh+stdinserver
host: prod1.example.com
user: root
port: 22
identity_file: /root/.ssh/id_ed25519
# pull (=ask for new snapshots) every 10m, prune afterwards
# this will leave us at most 10m behind production
interval: 10m
# pull all offered filesystems to storage/backups/zrepl/pull/prod1.example.com
mapping: {
"<":"storage/backups/zrepl/pull/prod1.example.com"
}
# follow a grandfathering scheme for filesystems on the right-hand-side of the mapping
snapshot_prefix: zrepl_
prune:
policy: grid
grid: 1x1h(keep=all) | 24x1h | 35x1d | 6x30d

View File

@ -1,47 +0,0 @@
global:
serve:
stdinserver:
# Directory where AF_UNIX sockets for stdinserver command are placed.
#
# `zrepl stdinserver CLIENT_IDENTITY`
# * connects to the socket in $sockdir/CLIENT_IDENTITY
# * sends its stdin / stdout file descriptors to the `zrepl daemon` process (see cmsg(3))
# * does nothing more
#
# This enables a setup where `zrepl daemon` is not directly exposed to the internet
# but instead all traffic is tunnelled through SSH.
# The server with the source job has an authorized_keys file entry for the public key
# used by the corresponding pull job
#
# command="/mnt/zrepl stdinserver CLIENT_IDENTITY" ssh-ed25519 AAAAC3NzaC1E... zrepl@pullingserver
#
# Below is the default value.
sockdir: /var/run/zrepl/stdinserver
jobs:
- name: fullbackup_prod1
# expect remote to connect via ssh+stdinserver with fullbackup_prod1 as client_identity
type: source
serve:
type: stdinserver # see global.serve.stdinserver for explanation
client_identity: fullbackup_prod1
# snapshot these filesystems every 10m with zrepl_ as prefix
filesystems: {
"zroot/var/db<": "ok",
"zroot/usr/home<": "ok",
"zroot/var/tmp": "!", #don't backup /tmp
}
snapshot_prefix: zrepl_
interval: 10m
# keep 1 hour of snapshots (6 at 10m interval)
# and one day of bookmarks in case pull doesn't work (link down, etc)
# => keep_bookmarks = 24h / interval = 24h / 10m = 144
prune:
policy: grid
grid: 1x1h(keep=all)
keep_bookmarks: 144

View File

@ -1,20 +0,0 @@
jobs:
- name: fullbackup_prod1
# expect remote to connect via ssh+stdinserver with fullbackup_prod1 as client_identity
type: push-sink
serve:
type: stdinserver
client_identity: fullbackup_prod1
# map all pushed datasets to storage/backups/zrepl/sink/prod1.example.com
mapping: {
"<":"storage/backups/zrepl/sink/prod1.example.com"
}
# follow a grandfathering scheme for filesystems on the right-hand-side of the mapping
prune:
policy: grid
grid: 1x1h(keep=all) | 24x1h | 35x1d | 6x30d

View File

@ -1,26 +0,0 @@
jobs:
- name: fullbackup_prod1
# connect to remote using ssh / stdinserver command
type: push
connect:
type: ssh+stdinserver
host: prod1.example.com
user: root
port: 22
identity_file: /root/.ssh/id_ed25519
# snapshot these datsets every 10m with zrepl_ as prefix
filesystems: {
"zroot/var/db<": "ok",
"zroot/usr/home<": "!",
}
snapshot_prefix: zrepl_
interval: 10m
# keep a one day window 10m interval snapshots in case push doesn't work (link down, etc)
# (we cannot keep more than one day because this host will run out of disk space)
prune:
policy: grid
grid: 1x1d(keep=all)

View File

@ -1,33 +0,0 @@
global:
serve:
stdinserver:
sockdir: /var/run/zrepl/stdinserver
jobs:
- name: debian2_pull
# JOB DEBUGGING OPTIONS
# should be equal for all job types, but each job implements the debugging itself
# => consult job documentation for supported options
debug:
conn: # debug the io.ReadWriteCloser connection
read_dump: /tmp/connlog_read # dump results of Read() invocations to this file
write_dump: /tmp/connlog_write # dump results of Write() invocations to this file
rpc: # debug the RPC protocol implementation
log: true # log output from rpc layer to the job log
# ... just to make the unit tests pass.
# check other examples, e.g. localbackup or pullbackup for what the sutff below means
type: source
serve:
type: stdinserver
client_identity: debian2
filesystems: {
"pool1/db<": ok
}
snapshot_prefix: zrepl_
interval: 1s
prune:
policy: grid
grid: 1x10s(keep=all)
keep_bookmarks: all

View File

@ -1,19 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDIzCCAgsCAQEwDQYJKoZIhvcNAQELBQAwWTELMAkGA1UEBhMCQVUxEzARBgNV
BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0
ZDESMBAGA1UEAwwJbG9nc2VydmVyMB4XDTE3MDkyNDEyMzAzNloXDTE3MTAyNDEy
MzAzNlowVjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNV
BAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGY2xpZW50MIIB
IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAt/xJTUlqApeJGzRD+w2J8sZS
Bo+s+04T987L/M6gaCo8aDSTEb/ZH3XSoU5JEmO6kPpwNNapOsaEhTCjndZQdm5F
uqiUtAg1uW0HCkBEIDkGr9bFHDKzpewGmmMgfQ2+hfiBR/4ZCrc/vd9P0W9BiWQS
Dtc7p22XraWPVL8HlSz5K/Ih+V6i8O+kBltZkusiJh2bWPoRp/netiTZuc6du+Wp
kpWp1OBaTU4GXIAlLj5afF14BBphRQK983Yhaz53BkA7OQ76XxowynMjmuLQVGmK
f1R9zEJuohTX9XIr1tp/ueRHcS4Awk6LcNZUMCV6270FNSIw2f4hbOZvep+t2wID
AQABMA0GCSqGSIb3DQEBCwUAA4IBAQACK3OeNzScpiNwz/jpg/usQzvXbZ/wDvml
YLjtzn/A65ox8a8BhxvH1ydyoCM2YAGYX7+y7qXJnMgRO/v8565CQIVcznHhg9ST
3828/WqZ3bXf2DV5GxKKQf7hPmBnyVUUhn/Ny91MECED27lZucWiX/bczN8ffDeh
M3+ngezcJxsOBd4x0gLrqIJCoaFRSeepOaFEW6GHQ8loxE9GmA7FQd2phIpJHFSd
Z7nQl7X5C1iN2OboEApJHwtmNVC45UlOpg53vo2sDTLhSfdogstiWi8x1HmvhIGM
j3XHs0Illvo9OwVrmgUph8zQ7pvr/AFrTOIbhgzl/9uVUk5ApwFM
-----END CERTIFICATE-----

View File

@ -1,16 +0,0 @@
-----BEGIN CERTIFICATE REQUEST-----
MIICmzCCAYMCAQAwVjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGY2xp
ZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAt/xJTUlqApeJGzRD
+w2J8sZSBo+s+04T987L/M6gaCo8aDSTEb/ZH3XSoU5JEmO6kPpwNNapOsaEhTCj
ndZQdm5FuqiUtAg1uW0HCkBEIDkGr9bFHDKzpewGmmMgfQ2+hfiBR/4ZCrc/vd9P
0W9BiWQSDtc7p22XraWPVL8HlSz5K/Ih+V6i8O+kBltZkusiJh2bWPoRp/netiTZ
uc6du+WpkpWp1OBaTU4GXIAlLj5afF14BBphRQK983Yhaz53BkA7OQ76XxowynMj
muLQVGmKf1R9zEJuohTX9XIr1tp/ueRHcS4Awk6LcNZUMCV6270FNSIw2f4hbOZv
ep+t2wIDAQABoAAwDQYJKoZIhvcNAQELBQADggEBAKnlr0Qs5KYF85u2YA7DJ5pL
HwAx+qNoNbox5CS1aynrDBpDTWLaErviUJ+4WxRlRyTMEscMOIOKajbYhqqFmtGZ
mu3SshZnFihErw8TOQMyU1LGGG+l6r+6ve5TciwJRLla2Y75z7izr6cyvQNRWdLr
PvxL1/Yqr8LKha12+7o28R4SLf6/GY0GcedqoebRmtuwA/jES0PuGauEUD5lH4cj
Me8sqRrB+IMHQ5j8hlJX4DbA8UQRUBL64sHkQzeQfWu+qkWmS5I19CFfLNrcH+OV
yhyjGfN0q0jHyHdpckBhgzS7IIdo6P66AIlm4qpHM7Scra3JaGM7oaZPamJ6f8U=
-----END CERTIFICATE REQUEST-----

View File

@ -1,28 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC3/ElNSWoCl4kb
NEP7DYnyxlIGj6z7ThP3zsv8zqBoKjxoNJMRv9kfddKhTkkSY7qQ+nA01qk6xoSF
MKOd1lB2bkW6qJS0CDW5bQcKQEQgOQav1sUcMrOl7AaaYyB9Db6F+IFH/hkKtz+9
30/Rb0GJZBIO1zunbZetpY9UvweVLPkr8iH5XqLw76QGW1mS6yImHZtY+hGn+d62
JNm5zp275amSlanU4FpNTgZcgCUuPlp8XXgEGmFFAr3zdiFrPncGQDs5DvpfGjDK
cyOa4tBUaYp/VH3MQm6iFNf1civW2n+55EdxLgDCTotw1lQwJXrbvQU1IjDZ/iFs
5m96n63bAgMBAAECggEAF4om0sWe06ARwbJJNFjCGpa3LfG5/xk5Qs5pmPnS2iD1
Q5veaTnzjKvlfA/pF3o9B4mTS59fXY7Cq8vSU0J1XwGy2DPzeqlGPmgtq2kXjkvd
iCfhZj8ybvsoyR3/rSBSDRADcnOXPqC9fgyRSMmESBDOoql1D3HdIzF4ii46ySIU
/XQvExS6NWifbP+Ue6DETV8NhreO5PqjeXLITQhhndtc8MDL/8eCNOyN8XjYIWKX
smlBYtRQYOOY9BHOQgUn6yvPHrtKJNKci+qcQNvWir66mBhY1o40MH5wTIV+8yP2
Vbm/VzoNKIYgeROsilBW7QTwGvkDn3R11zeTqfUNSQKBgQD0eFzhJAEZi4uBw6Tg
NKmBC5Y1IHPOsb5gKPNz9Z9j4qYRDySgYl6ISk+2EdhgUCo1NmTk8EIPQjIerUVf
S+EogFnpsj8U9LR3OM79DaGkNULxrHqhd209/g8DtVgk7yjkxL4vmVOv8qpHMp/7
eWsylN7AOxj2RB/eXYQBPrw+jQKBgQDAqae9HasLmvpJ9ktTv30yZSKXC+LP4A0D
RBBmx410VpPd4CvcpCJxXmjer6B7+9L1xHYP2pvsnMBid5i0knuvyK28dYy7fldl
CzWvb+lqNA5YYPFXQED4oEdihlQczoI1Bm06SFizeAKD1Q9e2c+lgbR/51j8xuXi
twvhMj/YBwKBgQCZw97/iQrcC2Zq7yiUEOuQjD4lGk1c83U/vGIsTJC9XcCAOFsc
OeMlrD/oz96d7a4unBDn4qpaOJOXsfpRT0PGmrxy/jcpMiUUW/ntNpa11v5NTeQw
DRL8DAFbnsNbL8Yz5f+Nps35fBNYBuKTZLJlNTfKByHTO9QjpAQ0WEZEvQKBgQCi
Ovm83EuYVSKmvxcE6Tyx/8lVqTOO2Vn7wweQlD4/lVujvE0S2L8L+XSS9w5K+GzW
eFz10p3zarbw80YJ30L5bSEmjVE43BUZR4woMzM4M6dUsiTm1HshIE2b4ALZ0uZ/
Ye794ceXL9nmSrVLqFsaQZLNFPCwwYb4FiyRry9lZwKBgAO9VbWcN8SEeBDKo3z8
yRbRTc6sI+AdKY44Dfx0tqOPmTjO3mE4X1GU4sbfD2Bvg3DdjwTuxxC/jHaKu0GG
dTM0CbrZGbDAj7E87SOcN/PWEeBckSvuQq5H3DQfwIpTmlS1l5oZn9CxRGbLqC2G
ifnel8XWUG0ROybsr1tk4mzW
-----END PRIVATE KEY-----

View File

@ -1,21 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDiDCCAnCgAwIBAgIJALhp/WvTQeg/MA0GCSqGSIb3DQEBCwUAMFkxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvZ3NlcnZlcjAeFw0xNzA5MjQxMjI3
MDRaFw0yNzA5MjIxMjI3MDRaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21l
LVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNV
BAMMCWxvZ3NlcnZlcjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKs3
TLYfXhV3hap71tOkhPQlM+m0EKRAo8Nua50Cci5UhDo4JkVpyYok1h+NFkqmjU2b
IiIuGvsZZPOWYjbWWnSJE4+n5pBFBzcfNQ4d8xVxjANImFn6Tcehhj0WkbDIv/Ge
364XUgywS7u3EGQj/FO7vZ8KHlUxBHNuPIOPHftwIVRyleh5K32UyBaSpSmnqGos
rvI1byMuznavcZpOs4vlebZ+Jy6a20iKf9fj/0f0t0O+F5x3JIk07D3zSywhJ4RM
M0mGIUmYXbh2SMh+f61KDZLDANpz/pMAPbUJe0mxEtBf0tnwK1gEqc3SLwA0EwiM
8Hnn2iaH5Ln20UE3LOkCAwEAAaNTMFEwHQYDVR0OBBYEFDXoDcwx9SngzZcRYCeP
BplBecfiMB8GA1UdIwQYMBaAFDXoDcwx9SngzZcRYCePBplBecfiMA8GA1UdEwEB
/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBADyNvs4AA91x3gurQb1pcPVhK6nR
mkYSTN1AsDKSRi/X2iCUmR7G7FlF7XW8mntTpHvVzcs+gr94WckH5wqEOA5iZnaw
PXUWexmdXUge4hmC2q6kBQ5e2ykhSJMRVZXvOLZOZV9qitceamHESV1cKZSNMvZM
aCSVA1RK61/nUzs04pVp5PFPv9gFxJp9ki39FYFdsgZmM5RZ5I/FqxxvTJzu4RnH
VPjsMopzARYwJw6dV2bKdFSYOE8B/Vs3Yv0GxjrABw2ko4PkBPTjLIz22x6+Hd9r
K9BQi4pVmQfvppF5+SORSftlHSS+N47b0DD1rW1f5R6QGi71dFuJGikOwvY=
-----END CERTIFICATE-----

View File

@ -1,28 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCrN0y2H14Vd4Wq
e9bTpIT0JTPptBCkQKPDbmudAnIuVIQ6OCZFacmKJNYfjRZKpo1NmyIiLhr7GWTz
lmI21lp0iROPp+aQRQc3HzUOHfMVcYwDSJhZ+k3HoYY9FpGwyL/xnt+uF1IMsEu7
txBkI/xTu72fCh5VMQRzbjyDjx37cCFUcpXoeSt9lMgWkqUpp6hqLK7yNW8jLs52
r3GaTrOL5Xm2ficumttIin/X4/9H9LdDvhecdySJNOw980ssISeETDNJhiFJmF24
dkjIfn+tSg2SwwDac/6TAD21CXtJsRLQX9LZ8CtYBKnN0i8ANBMIjPB559omh+S5
9tFBNyzpAgMBAAECggEBAIY8ZwJq+WKvQLb3POjWFf8so9TY/ispGrwAeJKy9j5o
uPrERw0o8YBDfTVjclS43BQ6Srqtly3DLSjlgL8ps+WmCxYYN2ZpGE0ZRIl65bis
O2/fnML+wbiAZTTD2xnVatfPDeP6GLQmDFpyHoHEzPIBQZvNXRbBxZGSnhMvQ/x7
FhqSBQG4kf3b1XDCENIbFEVOBOCg7WtMiIgjEGS7QnW3I65/Zt+Ts1LXRZbz+6na
Gmi0PGHA/oLUh1NRzsF4zuZn6fFzja5zw4mkt+JvCWEoxg1QhRAxRp6QQwmZ6MIc
1rw1D4Z+c5UEKyqHeIwZj4M6UNPhCfTXVm47c9eSiGECgYEA4U8pB+7eRo2fqX0C
nWsWMcmsULJvwplQnUSFenUayPn3E8ammS/ZBHksoKhj82vwIdDbtS1hQZn8Bzsi
atc8au0wz0YRDcVDzHX4HknXVQayHtP/FTPeSr5hwpoY8vhEbySuxBTBkXCrp4dx
u5ErfOiYEP3Q1ZvPRywelrATu20CgYEAwonV5dgOcen/4oAirlnvufc2NfqhAQwJ
FJ/JSVMAcXxPYu3sZMv0dGWrX8mLc+P1+XMCuV/7eBM/vU2LbDzmpeUV8sJfB2jw
wyKqKXZwBgeq60btriA4f+0ElwRGgU2KSiniUuuTX2JmyftFQx4cVAQRCFk27NY0
09psSsYyre0CgYBo6unabdtH029EB5iOIW3GZXk+Yrk0TxyA/4WAjsOYTv5FUT4H
G4bdVGf5sDBLDDpYJOAKsEUXvVLlMx5FzlCuIiGWg7QxS2jU7yJJSG1jhKixPlsM
Toj3GUyAyC1SB1Ymw1g2qsuwpFzquGG3zFQJ6G3Xi7oRnmqZY+wik3+8yQKBgB11
SdKYOPe++2SNCrNkIw0CBk9+OEs0S1u4Jn7X9sU4kbzlUlqhF89YZe8HUfqmlmTD
qbHwet/f6lL8HxSw1Cxi2EP+cu1oUqz53tKQgL4pAxTFlNA9SND2Ty+fEh4aY8p/
NSphSduzxuTnC8HyGVAPnZSqDcsnVLCP7r4T7TCxAoGAbJygkkk/gZ9pT4fZoIaq
8CMR8FTfxtkwCuZsWccSMUOWtx9nqet3gbCpKHfyoYZiKB4ke+lnUz4uFS16Y3hG
kN0hFfvfoNa8eB2Ox7vs60cMMfWJac0H7KSaDDy+EvbhE2KtQADT0eWxMyhzGR8p
5CbIivB0QCjeQIA8dOQpE8E=
-----END PRIVATE KEY-----

View File

@ -1,28 +0,0 @@
global:
logging:
- outlet: stdout
level: warn
format: human
- outlet: tcp
level: debug
format: json
net: tcp
address: 127.0.0.1:8080
retry_interval: 1s
tls: # if not specified, use plain TCP
ca: sampleconf/random/logging/logserver.crt
cert: sampleconf/random/logging/client.crt
key: sampleconf/random/logging/client.key
- outlet: syslog
level: debug
format: logfmt
monitoring:
- type: prometheus
listen: ':9090'
jobs: []

View File

@ -1,55 +0,0 @@
package cmd
import (
"os"
"context"
"github.com/problame/go-netssh"
"github.com/spf13/cobra"
"log"
"path"
)
var StdinserverCmd = &cobra.Command{
Use: "stdinserver CLIENT_IDENTITY",
Short: "start in stdinserver mode (from authorized_keys file)",
Run: cmdStdinServer,
}
func init() {
RootCmd.AddCommand(StdinserverCmd)
}
func cmdStdinServer(cmd *cobra.Command, args []string) {
// NOTE: the netssh proxying protocol requires exiting with non-zero status if anything goes wrong
defer os.Exit(1)
log := log.New(os.Stderr, "", log.LUTC|log.Ldate|log.Ltime)
conf, err := ParseConfig(rootArgs.configFile)
if err != nil {
log.Printf("error parsing config: %s", err)
return
}
if len(args) != 1 || args[0] == "" {
log.Print("must specify client_identity as positional argument")
return
}
identity := args[0]
unixaddr := path.Join(conf.Global.Serve.Stdinserver.SockDir, identity)
log.Printf("proxying client identity '%s' to zrepl daemon '%s'", identity, unixaddr)
ctx := netssh.ContextWithLog(context.TODO(), log)
err = netssh.Proxy(ctx, unixaddr)
if err == nil {
log.Print("proxying finished successfully, exiting with status 0")
os.Exit(0)
}
log.Printf("error proxying: %s", err)
}

View File

@ -1,214 +0,0 @@
package cmd
import (
"os"
"bytes"
"context"
"fmt"
"sort"
"strings"
"github.com/kr/pretty"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/zfs"
"time"
)
var testCmd = &cobra.Command{
Use: "test",
Short: "test configuration",
PersistentPreRun: testCmdGlobalInit,
}
var testCmdGlobal struct {
log Logger
conf *Config
}
var testConfigSyntaxCmd = &cobra.Command{
Use: "config",
Short: "parse config file and dump parsed datastructure",
Run: doTestConfig,
}
var testDatasetMapFilter = &cobra.Command{
Use: "pattern jobname test/zfs/dataset/path",
Short: "test dataset mapping / filter specified in config",
Example: ` zrepl test pattern my_pull_job tank/tmp`,
Run: doTestDatasetMapFilter,
}
var testPrunePolicyArgs struct {
side PrunePolicySide
showKept bool
showRemoved bool
}
var testPrunePolicyCmd = &cobra.Command{
Use: "prune jobname",
Short: "do a dry-run of the pruning part of a job",
Run: doTestPrunePolicy,
}
func init() {
RootCmd.AddCommand(testCmd)
testCmd.AddCommand(testConfigSyntaxCmd)
testCmd.AddCommand(testDatasetMapFilter)
testPrunePolicyCmd.Flags().VarP(&testPrunePolicyArgs.side, "side", "s", "prune_lhs (left) or prune_rhs (right)")
testPrunePolicyCmd.Flags().BoolVar(&testPrunePolicyArgs.showKept, "kept", false, "show kept snapshots")
testPrunePolicyCmd.Flags().BoolVar(&testPrunePolicyArgs.showRemoved, "removed", true, "show removed snapshots")
testCmd.AddCommand(testPrunePolicyCmd)
}
func testCmdGlobalInit(cmd *cobra.Command, args []string) {
out := logger.NewOutlets()
out.Add(WriterOutlet{&NoFormatter{}, os.Stdout}, logger.Info)
log := logger.NewLogger(out, 1*time.Second)
testCmdGlobal.log = log
var err error
if testCmdGlobal.conf, err = ParseConfig(rootArgs.configFile); err != nil {
testCmdGlobal.log.Printf("error parsing config file: %s", err)
os.Exit(1)
}
}
func doTestConfig(cmd *cobra.Command, args []string) {
log, conf := testCmdGlobal.log, testCmdGlobal.conf
log.Printf("config ok")
log.Printf("%# v", pretty.Formatter(conf))
return
}
func doTestDatasetMapFilter(cmd *cobra.Command, args []string) {
log, conf := testCmdGlobal.log, testCmdGlobal.conf
if len(args) != 2 {
log.Printf("specify job name as first postitional argument, test input as second")
log.Printf(cmd.UsageString())
os.Exit(1)
}
n, i := args[0], args[1]
jobi, err := conf.LookupJob(n)
if err != nil {
log.Printf("%s", err)
os.Exit(1)
}
var mf *DatasetMapFilter
switch j := jobi.(type) {
case *PullJob:
mf = j.Mapping
case *SourceJob:
mf = j.Filesystems
case *LocalJob:
mf = j.Mapping
default:
panic("incomplete implementation")
}
ip, err := zfs.NewDatasetPath(i)
if err != nil {
log.Printf("cannot parse test input as ZFS dataset path: %s", err)
os.Exit(1)
}
if mf.filterMode {
pass, err := mf.Filter(ip)
if err != nil {
log.Printf("error evaluating filter: %s", err)
os.Exit(1)
}
log.Printf("filter result: %v", pass)
} else {
res, err := mf.Map(ip)
if err != nil {
log.Printf("error evaluating mapping: %s", err)
os.Exit(1)
}
toStr := "NO MAPPING"
if res != nil {
toStr = res.ToString()
}
log.Printf("%s => %s", ip.ToString(), toStr)
}
}
func doTestPrunePolicy(cmd *cobra.Command, args []string) {
log, conf := testCmdGlobal.log, testCmdGlobal.conf
if cmd.Flags().NArg() != 1 {
log.Printf("specify job name as first positional argument")
log.Printf(cmd.UsageString())
os.Exit(1)
}
jobname := cmd.Flags().Arg(0)
jobi, err := conf.LookupJob(jobname)
if err != nil {
log.Printf("%s", err)
os.Exit(1)
}
jobp, ok := jobi.(PruningJob)
if !ok {
log.Printf("job doesn't do any prunes")
os.Exit(0)
}
log.Printf("job dump:\n%s", pretty.Sprint(jobp))
pruner, err := jobp.Pruner(testPrunePolicyArgs.side, true)
if err != nil {
log.Printf("cannot create test pruner: %s", err)
os.Exit(1)
}
log.Printf("start pruning")
ctx := WithLogger(context.Background(), log)
result, err := pruner.Run(ctx)
if err != nil {
log.Printf("error running pruner: %s", err)
os.Exit(1)
}
sort.Slice(result, func(i, j int) bool {
return strings.Compare(result[i].Filesystem.ToString(), result[j].Filesystem.ToString()) == -1
})
var b bytes.Buffer
for _, r := range result {
fmt.Fprintf(&b, "%s\n", r.Filesystem.ToString())
if testPrunePolicyArgs.showKept {
fmt.Fprintf(&b, "\tkept:\n")
for _, v := range r.Keep {
fmt.Fprintf(&b, "\t- %s\n", v.Name)
}
}
if testPrunePolicyArgs.showRemoved {
fmt.Fprintf(&b, "\tremoved:\n")
for _, v := range r.Remove {
fmt.Fprintf(&b, "\t- %s\n", v.Name)
}
}
}
log.Printf("pruning result:\n%s", b.String())
}

View File

@ -1,21 +0,0 @@
package cmd
import (
"fmt"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/version"
)
var versionCmd = &cobra.Command{
Use: "version",
Short: "print version of zrepl binary (for running daemon 'zrepl control version' command)",
Run: doVersion,
}
func init() {
RootCmd.AddCommand(versionCmd)
}
func doVersion(cmd *cobra.Command, args []string) {
fmt.Println(version.NewZreplVersionInformation().String())
}