diff --git a/cmd/adaptors.go b/cmd/adaptors.go deleted file mode 100644 index acf1a0d..0000000 --- a/cmd/adaptors.go +++ /dev/null @@ -1,103 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "io" - "net" - "strings" - "time" - - "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/logger" - "github.com/zrepl/zrepl/util" -) - -type logNetConnConnecter struct { - streamrpc.Connecter - ReadDump, WriteDump string -} - -var _ streamrpc.Connecter = logNetConnConnecter{} - -func (l logNetConnConnecter) Connect(ctx context.Context) (net.Conn, error) { - conn, err := l.Connecter.Connect(ctx) - if err != nil { - return nil, err - } - return util.NewNetConnLogger(conn, l.ReadDump, l.WriteDump) -} - -type logListenerFactory struct { - ListenerFactory - ReadDump, WriteDump string -} - -var _ ListenerFactory = logListenerFactory{} - -type logListener struct { - net.Listener - ReadDump, WriteDump string -} - -var _ net.Listener = logListener{} - -func (m logListenerFactory) Listen() (net.Listener, error) { - l, err := m.ListenerFactory.Listen() - if err != nil { - return nil, err - } - return logListener{l, m.ReadDump, m.WriteDump}, nil -} - -func (l logListener) Accept() (net.Conn, error) { - conn, err := l.Listener.Accept() - if err != nil { - return nil, err - } - return util.NewNetConnLogger(conn, l.ReadDump, l.WriteDump) -} - -type netsshAddr struct{} - -func (netsshAddr) Network() string { return "netssh" } -func (netsshAddr) String() string { return "???" } - -type netsshConnToNetConnAdatper struct { - io.ReadWriteCloser // works for both netssh.SSHConn and netssh.ServeConn -} - -func (netsshConnToNetConnAdatper) LocalAddr() net.Addr { return netsshAddr{} } - -func (netsshConnToNetConnAdatper) RemoteAddr() net.Addr { return netsshAddr{} } - -func (netsshConnToNetConnAdatper) SetDeadline(t time.Time) error { return nil } - -func (netsshConnToNetConnAdatper) SetReadDeadline(t time.Time) error { return nil } - -func (netsshConnToNetConnAdatper) SetWriteDeadline(t time.Time) error { return nil } - -type streamrpcLogAdaptor = twoClassLogAdaptor -type replicationLogAdaptor = twoClassLogAdaptor - -type twoClassLogAdaptor struct { - logger.Logger -} - -var _ streamrpc.Logger = twoClassLogAdaptor{} - -func (a twoClassLogAdaptor) Errorf(fmtStr string, args ...interface{}) { - const errorSuffix = ": %s" - if len(args) == 1 { - if err, ok := args[0].(error); ok && strings.HasSuffix(fmtStr, errorSuffix) { - msg := strings.TrimSuffix(fmtStr, errorSuffix) - a.WithError(err).Error(msg) - return - } - } - a.Logger.Error(fmt.Sprintf(fmtStr, args...)) -} - -func (a twoClassLogAdaptor) Infof(fmtStr string, args ...interface{}) { - a.Logger.Info(fmt.Sprintf(fmtStr, args...)) -} diff --git a/cmd/autosnap.go b/cmd/autosnap.go deleted file mode 100644 index 983513f..0000000 --- a/cmd/autosnap.go +++ /dev/null @@ -1,192 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "github.com/zrepl/zrepl/zfs" - "sort" - "time" -) - -type IntervalAutosnap struct { - DatasetFilter zfs.DatasetFilter - Prefix string - SnapshotInterval time.Duration -} - -func (a *IntervalAutosnap) filterFilesystems(ctx context.Context) (fss []*zfs.DatasetPath, stop bool) { - fss, err := zfs.ZFSListMapping(a.DatasetFilter) - stop = err != nil - if err != nil { - getLogger(ctx).WithError(err).Error("cannot list datasets") - } - if len(fss) == 0 { - getLogger(ctx).Warn("no filesystem matching filesystem filter") - } - return fss, stop -} - -func (a *IntervalAutosnap) findSyncPoint(log Logger, fss []*zfs.DatasetPath) (syncPoint time.Time, err error) { - type snapTime struct { - ds *zfs.DatasetPath - time time.Time - } - - if len(fss) == 0 { - return time.Now(), nil - } - - snaptimes := make([]snapTime, 0, len(fss)) - - now := time.Now() - - log.Debug("examine filesystem state") - for _, d := range fss { - - l := log.WithField("fs", d.ToString()) - - fsvs, err := zfs.ZFSListFilesystemVersions(d, NewPrefixFilter(a.Prefix)) - if err != nil { - l.WithError(err).Error("cannot list filesystem versions") - continue - } - if len(fsvs) <= 0 { - l.WithField("prefix", a.Prefix).Info("no filesystem versions with prefix") - continue - } - - // Sort versions by creation - sort.SliceStable(fsvs, func(i, j int) bool { - return fsvs[i].CreateTXG < fsvs[j].CreateTXG - }) - - latest := fsvs[len(fsvs)-1] - l.WithField("creation", latest.Creation). - Debug("found latest snapshot") - - since := now.Sub(latest.Creation) - if since < 0 { - l.WithField("snapshot", latest.Name). - WithField("creation", latest.Creation). - Error("snapshot is from the future") - continue - } - next := now - if since < a.SnapshotInterval { - next = latest.Creation.Add(a.SnapshotInterval) - } - snaptimes = append(snaptimes, snapTime{d, next}) - } - - if len(snaptimes) == 0 { - snaptimes = append(snaptimes, snapTime{nil, now}) - } - - sort.Slice(snaptimes, func(i, j int) bool { - return snaptimes[i].time.Before(snaptimes[j].time) - }) - - return snaptimes[0].time, nil - -} - -func (a *IntervalAutosnap) waitForSyncPoint(ctx context.Context, syncPoint time.Time) { - - const LOG_TIME_FMT string = time.ANSIC - - getLogger(ctx). - WithField("sync_point", syncPoint.Format(LOG_TIME_FMT)). - Info("wait for sync point") - - select { - case <-ctx.Done(): - getLogger(ctx).WithError(ctx.Err()).Info("context done") - return - case <-time.After(syncPoint.Sub(time.Now())): - } -} - -func (a *IntervalAutosnap) syncUpRun(ctx context.Context, didSnaps chan struct{}) (stop bool) { - fss, stop := a.filterFilesystems(ctx) - if stop { - return true - } - - syncPoint, err := a.findSyncPoint(getLogger(ctx), fss) - if err != nil { - return true - } - - a.waitForSyncPoint(ctx, syncPoint) - - getLogger(ctx).Debug("snapshot all filesystems to enable further snaps in lockstep") - a.doSnapshots(ctx, didSnaps) - return false -} - -func (a *IntervalAutosnap) Run(ctx context.Context, didSnaps chan struct{}) { - - log := getLogger(ctx) - - if a.syncUpRun(ctx, didSnaps) { - log.Error("stoppping autosnap after error in sync up") - return - } - - // task drops back to idle here - - log.Debug("setting up ticker in SnapshotInterval") - ticker := time.NewTicker(a.SnapshotInterval) - for { - select { - case <-ctx.Done(): - ticker.Stop() - log.WithError(ctx.Err()).Info("context done") - return - case <-ticker.C: - a.doSnapshots(ctx, didSnaps) - } - } - -} - -func (a *IntervalAutosnap) doSnapshots(ctx context.Context, didSnaps chan struct{}) { - log := getLogger(ctx) - - // don't cache the result from previous run in case the user added - // a new dataset in the meantime - ds, stop := a.filterFilesystems(ctx) - if stop { - return - } - - // TODO channel programs -> allow a little jitter? - for _, d := range ds { - suffix := time.Now().In(time.UTC).Format("20060102_150405_000") - snapname := fmt.Sprintf("%s%s", a.Prefix, suffix) - - l := log. - WithField("fs", d.ToString()). - WithField("snapname", snapname) - - l.Info("create snapshot") - err := zfs.ZFSSnapshot(d, snapname, false) - if err != nil { - l.WithError(err).Error("cannot create snapshot") - } - - l.Info("create corresponding bookmark") - err = zfs.ZFSBookmark(d, snapname, snapname) - if err != nil { - l.WithError(err).Error("cannot create bookmark") - } - - } - - select { - case didSnaps <- struct{}{}: - default: - log.Error("warning: callback channel is full, discarding") - } - -} diff --git a/cmd/bashcomp.go b/cmd/bashcomp.go deleted file mode 100644 index 3b39252..0000000 --- a/cmd/bashcomp.go +++ /dev/null @@ -1,29 +0,0 @@ -package cmd - -import ( - "fmt" - - "github.com/spf13/cobra" - "os" -) - -var bashcompCmd = &cobra.Command{ - Use: "bashcomp path/to/out/file", - Short: "generate bash completions", - Run: func(cmd *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Fprintf(os.Stderr, "specify exactly one positional agument\n") - cmd.Usage() - os.Exit(1) - } - if err := RootCmd.GenBashCompletionFile(args[0]); err != nil { - fmt.Fprintf(os.Stderr, "error generating bash completion: %s", err) - os.Exit(1) - } - }, - Hidden: true, -} - -func init() { - RootCmd.AddCommand(bashcompCmd) -} diff --git a/cmd/config.go b/cmd/config.go deleted file mode 100644 index 69ce086..0000000 --- a/cmd/config.go +++ /dev/null @@ -1,94 +0,0 @@ -package cmd - -import ( - "net" - - "fmt" - "github.com/pkg/errors" - "github.com/zrepl/zrepl/zfs" -) - -type Config struct { - Global Global - Jobs map[string]Job -} - -func (c *Config) LookupJob(name string) (j Job, err error) { - j, ok := c.Jobs[name] - if !ok { - return nil, errors.Errorf("job '%s' is not defined", name) - } - return j, nil -} - -type Global struct { - Serve struct { - Stdinserver struct { - SockDir string - } - } - Control struct { - Sockpath string - } - logging *LoggingConfig -} - -type JobDebugSettings struct { - Conn struct { - ReadDump string `mapstructure:"read_dump"` - WriteDump string `mapstructure:"write_dump"` - } - RPC struct { - Log bool - } -} - -type ListenerFactory interface { - Listen() (net.Listener, error) -} - -type SSHStdinServerConnectDescr struct { -} - -type PrunePolicy interface { - // Prune filters versions and decide which to keep and which to remove. - // Prune **does not** implement the actual removal of the versions. - Prune(fs *zfs.DatasetPath, versions []zfs.FilesystemVersion) (keep, remove []zfs.FilesystemVersion, err error) -} - -type PruningJob interface { - Pruner(side PrunePolicySide, dryRun bool) (Pruner, error) -} - -// A type for constants describing different prune policies of a PruningJob -// This is mostly a special-case for LocalJob, which is the only job that has two prune policies -// instead of one. -// It implements github.com/spf13/pflag.Value to be used as CLI flag for the test subcommand -type PrunePolicySide string - -const ( - PrunePolicySideDefault PrunePolicySide = "" - PrunePolicySideLeft PrunePolicySide = "left" - PrunePolicySideRight PrunePolicySide = "right" -) - -func (s *PrunePolicySide) String() string { - return string(*s) -} - -func (s *PrunePolicySide) Set(news string) error { - p := PrunePolicySide(news) - switch p { - case PrunePolicySideRight: - fallthrough - case PrunePolicySideLeft: - *s = p - default: - return errors.Errorf("must be either %s or %s", PrunePolicySideLeft, PrunePolicySideRight) - } - return nil -} - -func (s *PrunePolicySide) Type() string { - return fmt.Sprintf("%s | %s", PrunePolicySideLeft, PrunePolicySideRight) -} diff --git a/cmd/config_connect.go b/cmd/config_connect.go deleted file mode 100644 index ea71ac3..0000000 --- a/cmd/config_connect.go +++ /dev/null @@ -1,119 +0,0 @@ -package cmd - -import ( - "crypto/tls" - "net" - - "context" - "github.com/jinzhu/copier" - "github.com/pkg/errors" - "github.com/problame/go-netssh" - "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/cmd/tlsconf" - "github.com/zrepl/zrepl/config" - "time" -) - -type SSHStdinserverConnecter struct { - Host string - User string - Port uint16 - IdentityFile string - TransportOpenCommand []string - SSHCommand string - Options []string - dialTimeout time.Duration -} - -var _ streamrpc.Connecter = &SSHStdinserverConnecter{} - -func parseSSHStdinserverConnecter(in config.SSHStdinserverConnect) (c *SSHStdinserverConnecter, err error) { - - c = &SSHStdinserverConnecter{ - Host: in.Host, - User: in.User, - Port: in.Port, - IdentityFile: in.IdentityFile, - SSHCommand: in.SSHCommand, - Options: in.Options, - dialTimeout: in.DialTimeout, - } - return - -} - -type netsshConnToConn struct{ *netssh.SSHConn } - -var _ net.Conn = netsshConnToConn{} - -func (netsshConnToConn) SetDeadline(dl time.Time) error { return nil } -func (netsshConnToConn) SetReadDeadline(dl time.Time) error { return nil } -func (netsshConnToConn) SetWriteDeadline(dl time.Time) error { return nil } - -func (c *SSHStdinserverConnecter) Connect(dialCtx context.Context) (net.Conn, error) { - - var endpoint netssh.Endpoint - if err := copier.Copy(&endpoint, c); err != nil { - return nil, errors.WithStack(err) - } - dialCtx, dialCancel := context.WithTimeout(dialCtx, c.dialTimeout) // context.TODO tied to error handling below - defer dialCancel() - nconn, err := netssh.Dial(dialCtx, endpoint) - if err != nil { - if err == context.DeadlineExceeded { - err = errors.Errorf("dial_timeout of %s exceeded", c.dialTimeout) - } - return nil, err - } - return netsshConnToConn{nconn}, nil -} - -type TCPConnecter struct { - Address string - dialer net.Dialer -} - -func parseTCPConnecter(in config.TCPConnect) (*TCPConnecter, error) { - dialer := net.Dialer{ - Timeout: in.DialTimeout, - } - - return &TCPConnecter{in.Address, dialer}, nil -} - -func (c *TCPConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) { - return c.dialer.DialContext(dialCtx, "tcp", c.Address) -} - -type TLSConnecter struct { - Address string - dialer net.Dialer - tlsConfig *tls.Config -} - -func parseTLSConnecter(in config.TLSConnect) (*TLSConnecter, error) { - dialer := net.Dialer{ - Timeout: in.DialTimeout, - } - - ca, err := tlsconf.ParseCAFile(in.Ca) - if err != nil { - return nil, errors.Wrap(err, "cannot parse ca file") - } - - cert, err := tls.LoadX509KeyPair(in.Cert, in.Key) - if err != nil { - return nil, errors.Wrap(err, "cannot parse cert/key pair") - } - - tlsConfig, err := tlsconf.ClientAuthClient(in.ServerCN, ca, cert) - if err != nil { - return nil, errors.Wrap(err, "cannot build tls config") - } - - return &TLSConnecter{in.Address, dialer, tlsConfig}, nil -} - -func (c *TLSConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) { - return tls.DialWithDialer(&c.dialer, "tcp", c.Address, c.tlsConfig) -} diff --git a/cmd/config_fsvfilter.go b/cmd/config_fsvfilter.go deleted file mode 100644 index fc8839d..0000000 --- a/cmd/config_fsvfilter.go +++ /dev/null @@ -1,42 +0,0 @@ -package cmd - -import ( - "github.com/pkg/errors" - "github.com/zrepl/zrepl/zfs" - "strings" -) - -type AnyFSVFilter struct{} - -func (AnyFSVFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) { - return true, nil -} - -type PrefixFilter struct { - prefix string - fstype zfs.VersionType - fstypeSet bool // optionals anyone? -} - -func NewPrefixFilter(prefix string) *PrefixFilter { - return &PrefixFilter{prefix: prefix} -} - -func NewTypedPrefixFilter(prefix string, versionType zfs.VersionType) *PrefixFilter { - return &PrefixFilter{prefix, versionType, true} -} - -func parseSnapshotPrefix(i string) (p string, err error) { - if len(i) <= 0 { - err = errors.Errorf("snapshot prefix must not be empty string") - return - } - p = i - return -} - -func (f *PrefixFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) { - fstypeMatches := (!f.fstypeSet || t == f.fstype) - prefixMatches := strings.HasPrefix(name, f.prefix) - return fstypeMatches && prefixMatches, nil -} diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go deleted file mode 100644 index 8bf4fa3..0000000 --- a/cmd/config_job_local.go +++ /dev/null @@ -1,185 +0,0 @@ -package cmd - -import ( - "time" - - "context" - "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" - "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/endpoint" - "github.com/zrepl/zrepl/replication" - "github.com/zrepl/zrepl/zfs" - "sync" -) - -type LocalJob struct { - Name string - Mapping *DatasetMapFilter - SnapshotPrefix string - Interval time.Duration - PruneLHS PrunePolicy - PruneRHS PrunePolicy - Debug JobDebugSettings -} - -func parseLocalJob(c config.Global, in source.LocalJob) (j *LocalJob, err error) { - - var asMap struct { - Mapping map[string]string - SnapshotPrefix string `mapstructure:"snapshot_prefix"` - Interval string - InitialReplPolicy string `mapstructure:"initial_repl_policy"` - PruneLHS map[string]interface{} `mapstructure:"prune_lhs"` - PruneRHS map[string]interface{} `mapstructure:"prune_rhs"` - Debug map[string]interface{} - } - - if err = mapstructure.Decode(i, &asMap); err != nil { - err = errors.Wrap(err, "mapstructure error") - return nil, err - } - - j = &LocalJob{Name: name} - - if j.Mapping, err = parseDatasetMapFilter(asMap.Mapping, false); err != nil { - return - } - - if j.SnapshotPrefix, err = parseSnapshotPrefix(asMap.SnapshotPrefix); err != nil { - return - } - - if j.Interval, err = parsePostitiveDuration(asMap.Interval); err != nil { - err = errors.Wrap(err, "cannot parse interval") - return - } - - if j.PruneLHS, err = parsePrunePolicy(asMap.PruneLHS, true); err != nil { - err = errors.Wrap(err, "cannot parse 'prune_lhs'") - return - } - if j.PruneRHS, err = parsePrunePolicy(asMap.PruneRHS, false); err != nil { - err = errors.Wrap(err, "cannot parse 'prune_rhs'") - return - } - - if err = mapstructure.Decode(asMap.Debug, &j.Debug); err != nil { - err = errors.Wrap(err, "cannot parse 'debug'") - return - } - - return -} - -func (j *LocalJob) JobName() string { - return j.Name -} - -func (j *LocalJob) JobType() JobType { return JobTypeLocal } - -func (j *LocalJob) JobStart(ctx context.Context) { - - log := getLogger(ctx) - - // Allow access to any dataset since we control what mapping - // is passed to the pull routine. - // All local datasets will be passed to its Map() function, - // but only those for which a mapping exists will actually be pulled. - // We can pay this small performance penalty for now. - wildcardMapFilter := NewDatasetMapFilter(1, false) - wildcardMapFilter.Add("<", "<") - sender := endpoint.NewSender(wildcardMapFilter, NewPrefixFilter(j.SnapshotPrefix)) - - receiver, err := endpoint.NewReceiver(j.Mapping, NewPrefixFilter(j.SnapshotPrefix)) - if err != nil { - log.WithError(err).Error("unexpected error setting up local handler") - } - - snapper := IntervalAutosnap{ - DatasetFilter: j.Mapping.AsFilter(), - Prefix: j.SnapshotPrefix, - SnapshotInterval: j.Interval, - } - - plhs, err := j.Pruner(PrunePolicySideLeft, false) - if err != nil { - log.WithError(err).Error("error creating lhs pruner") - return - } - prhs, err := j.Pruner(PrunePolicySideRight, false) - if err != nil { - log.WithError(err).Error("error creating rhs pruner") - return - } - - didSnaps := make(chan struct{}) - go snapper.Run(WithLogger(ctx, log.WithField(logSubsysField, "snap")), didSnaps) - -outer: - for { - - select { - case <-ctx.Done(): - log.WithError(ctx.Err()).Info("context") - break outer - case <-didSnaps: - log.Debug("finished taking snapshots") - log.Info("starting replication procedure") - } - - { - ctx := WithLogger(ctx, log.WithField(logSubsysField, "replication")) - rep := replication.NewReplication() - rep.Drive(ctx, sender, receiver) - } - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - plhs.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune_lhs"))) - wg.Done() - }() - - wg.Add(1) - go func() { - prhs.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune_rhs"))) - wg.Done() - }() - - wg.Wait() - } - -} - -func (j *LocalJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { - - var dsfilter zfs.DatasetFilter - var pp PrunePolicy - switch side { - case PrunePolicySideLeft: - pp = j.PruneLHS - dsfilter = j.Mapping.AsFilter() - case PrunePolicySideRight: - pp = j.PruneRHS - dsfilter, err = j.Mapping.InvertedFilter() - if err != nil { - err = errors.Wrap(err, "cannot invert mapping for prune_rhs") - return - } - default: - err = errors.Errorf("must be either left or right side") - return - } - - p = Pruner{ - time.Now(), - dryRun, - dsfilter, - j.SnapshotPrefix, - pp, - } - - return -} diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go deleted file mode 100644 index f3167c5..0000000 --- a/cmd/config_job_pull.go +++ /dev/null @@ -1,168 +0,0 @@ -package cmd - -import ( - "os" - "os/signal" - "syscall" - "time" - - "context" - - "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" - "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/endpoint" - "github.com/zrepl/zrepl/replication" -) - -type PullJob struct { - Name string - Connect streamrpc.Connecter - Interval time.Duration - Mapping *DatasetMapFilter - // constructed from mapping during parsing - pruneFilter *DatasetMapFilter - Prune PrunePolicy - - rep *replication.Replication -} - -func parsePullJob(c config.Global, in config.PullJob) (j *PullJob, err error) { - - j = &PullJob{Name: in.Name} - - j.Connect, err = parseConnect(in.Replication.Connect) - if err != nil { - err = errors.Wrap(err, "cannot parse 'connect'") - return nil, err - } - - j.Interval = in.Replication.Interval - - j.Mapping = NewDatasetMapFilter(1, false) - if err := j.Mapping.Add("<", in.Replication.RootDataset); err != nil { - return nil, err - } - - j.pruneFilter = NewDatasetMapFilter(1, true) - if err := j.pruneFilter.Add(in.Replication.RootDataset, MapFilterResultOk); err != nil { - return nil, err - } - - if j.Prune, err = parsePrunePolicy(asMap.Prune, false); err != nil { - err = errors.Wrap(err, "cannot parse prune policy") - return - } - - if in.Debug.Conn.ReadDump != "" || j.Debug.Conn.WriteDump != "" { - logConnecter := logNetConnConnecter{ - Connecter: j.Connect, - ReadDump: in.Debug.Conn.ReadDump, - WriteDump: in.Debug.Conn.WriteDump, - } - j.Connect = logConnecter - } - - return -} - -func (j *PullJob) JobName() string { return j.Name } - -func (j *PullJob) JobStart(ctx context.Context) { - - log := getLogger(ctx) - defer log.Info("exiting") - - // j.task is idle here idle here - usr1 := make(chan os.Signal) - signal.Notify(usr1, syscall.SIGUSR1) - defer signal.Stop(usr1) - - ticker := time.NewTicker(j.Interval) - for { - begin := time.Now() - j.doRun(ctx) - duration := time.Now().Sub(begin) - if duration > j.Interval { - log. - WithField("actual_duration", duration). - WithField("configured_interval", j.Interval). - Warn("pull run took longer than configured interval") - } - select { - case <-ctx.Done(): - log.WithError(ctx.Err()).Info("context") - return - case <-ticker.C: - case <-usr1: - } - } -} - -var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurability - RxHeaderMaxLen: 4096, - RxStructuredMaxLen: 4096 * 4096, - RxStreamMaxChunkSize: 4096 * 4096, - TxChunkSize: 4096 * 4096, - RxTimeout: streamrpc.Timeout{ - Progress: 10 * time.Second, - }, - TxTimeout: streamrpc.Timeout{ - Progress: 10 * time.Second, - }, -} - -func (j *PullJob) doRun(ctx context.Context) { - - log := getLogger(ctx) - // FIXME - clientConf := &streamrpc.ClientConfig{ - ConnConfig: STREAMRPC_CONFIG, - } - - client, err := streamrpc.NewClient(j.Connect, clientConf) - defer client.Close() - - sender := endpoint.NewRemote(client) - - receiver, err := endpoint.NewReceiver(j.Mapping, AnyFSVFilter{}) - if err != nil { - log.WithError(err).Error("error creating receiver endpoint") - return - } - - { - ctx := replication.WithLogger(ctx, replicationLogAdaptor{log.WithField(logSubsysField, "replication")}) - ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(logSubsysField, "rpc")}) - ctx = endpoint.WithLogger(ctx, log.WithField(logSubsysField, "endpoint")) - j.rep = replication.NewReplication() - j.rep.Drive(ctx, sender, receiver) - } - - client.Close() - - { - ctx := WithLogger(ctx, log.WithField(logSubsysField, "prune")) - pruner, err := j.Pruner(PrunePolicySideDefault, false) - if err != nil { - log.WithError(err).Error("error creating pruner") - } else { - pruner.Run(ctx) - } - } -} - -func (j *PullJob) Report() *replication.Report { - return j.rep.Report() -} - -func (j *PullJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { - p = Pruner{ - time.Now(), - dryRun, - j.pruneFilter, - j.Prune, - } - return -} diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go deleted file mode 100644 index 8e1c283..0000000 --- a/cmd/config_job_source.go +++ /dev/null @@ -1,172 +0,0 @@ -package cmd - -import ( - "context" - "time" - - "github.com/pkg/errors" - "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/endpoint" - "net" -) - -type SourceJob struct { - Name string - Serve ListenerFactory - Filesystems *DatasetMapFilter - SnapshotPrefix string - Interval time.Duration - Prune PrunePolicy -} - -func parseSourceJob(c config.Global, in config.SourceJob) (j *SourceJob, err error) { - j = &SourceJob{ - Name: in.Name, - Interval: in.Snapshotting.Interval, - } - - if j.Serve, err = parseAuthenticatedChannelListenerFactory(c, in.Replication.Serve); err != nil { - return - } - - if j.Filesystems, err = parseDatasetMapFilter(in.Replication.Filesystems, true); err != nil { - return - } - - if j.SnapshotPrefix, err = parseSnapshotPrefix(in.Snapshotting.SnapshotPrefix); err != nil { - return - } - - if j.Prune, err = parsePrunePolicy(in.Pruning, true); err != nil { - err = errors.Wrap(err, "cannot parse 'prune'") - return - } - - if in.Debug.Conn.ReadDump != "" || in.Debug.Conn.WriteDump != "" { - logServe := logListenerFactory{ - ListenerFactory: j.Serve, - ReadDump: in.Debug.Conn.ReadDump, - WriteDump: in.Debug.Conn.WriteDump, - } - j.Serve = logServe - } - - return -} - -func (j *SourceJob) JobName() string { - return j.Name -} - -func (j *SourceJob) JobType() JobType { return JobTypeSource } - -func (j *SourceJob) JobStart(ctx context.Context) { - - log := getLogger(ctx) - defer log.Info("exiting") - - a := IntervalAutosnap{j.Filesystems, j.SnapshotPrefix, j.Interval} - p, err := j.Pruner(PrunePolicySideDefault, false) - - if err != nil { - log.WithError(err).Error("error creating pruner") - return - } - - didSnaps := make(chan struct{}) - - go j.serve(ctx) // logSubsysField set by handleConnection - go a.Run(WithLogger(ctx, log.WithField(logSubsysField, "snap")), didSnaps) - -outer: - for { - select { - case <-ctx.Done(): - break outer - case <-didSnaps: - p.Run(WithLogger(ctx, log.WithField(logSubsysField, "prune"))) - } - } - log.WithError(ctx.Err()).Info("context") - -} - -func (j *SourceJob) Pruner(side PrunePolicySide, dryRun bool) (p Pruner, err error) { - p = Pruner{ - time.Now(), - dryRun, - j.Filesystems, - j.SnapshotPrefix, - j.Prune, - } - return -} - -func (j *SourceJob) serve(ctx context.Context) { - - log := getLogger(ctx) - - listener, err := j.Serve.Listen() - if err != nil { - getLogger(ctx).WithError(err).Error("error listening") - return - } - - type connChanMsg struct { - conn net.Conn - err error - } - connChan := make(chan connChanMsg, 1) - - // Serve connections until interrupted or error -outer: - for { - - go func() { - rwc, err := listener.Accept() - connChan <- connChanMsg{rwc, err} - }() - - select { - - case rwcMsg := <-connChan: - - if rwcMsg.err != nil { - log.WithError(rwcMsg.err).Error("error accepting connection") - continue - } - - j.handleConnection(ctx, rwcMsg.conn) - - case <-ctx.Done(): - log.WithError(ctx.Err()).Info("context") - break outer - } - - } - - log.Info("closing listener") - err = listener.Close() - if err != nil { - log.WithError(err).Error("error closing listener") - } - - return -} - -func (j *SourceJob) handleConnection(ctx context.Context, conn net.Conn) { - log := getLogger(ctx) - log.Info("handling client connection") - - senderEP := endpoint.NewSender(j.Filesystems, NewPrefixFilter(j.SnapshotPrefix)) - - ctx = endpoint.WithLogger(ctx, log.WithField(logSubsysField, "serve")) - ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(logSubsysField, "rpc")}) - handler := endpoint.NewHandler(senderEP) - if err := streamrpc.ServeConn(ctx, conn, STREAMRPC_CONFIG, handler.Handle); err != nil { - log.WithError(err).Error("error serving connection") - } else { - log.Info("client closed connection") - } -} diff --git a/cmd/config_logging.go b/cmd/config_logging.go deleted file mode 100644 index 88e8cb8..0000000 --- a/cmd/config_logging.go +++ /dev/null @@ -1,194 +0,0 @@ -package cmd - -import ( - "crypto/tls" - "crypto/x509" - "github.com/mattn/go-isatty" - "github.com/pkg/errors" - "github.com/zrepl/zrepl/cmd/tlsconf" - "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/logger" - "os" -) - -type LoggingConfig struct { - Outlets *logger.Outlets -} - -type MetadataFlags int64 - -const ( - MetadataTime MetadataFlags = 1 << iota - MetadataLevel - - MetadataNone MetadataFlags = 0 - MetadataAll MetadataFlags = ^0 -) - -func parseLogging(in []config.LoggingOutletEnum) (c *LoggingConfig, err error) { - - c = &LoggingConfig{} - c.Outlets = logger.NewOutlets() - - if len(in) == 0 { - // Default config - out := WriterOutlet{&HumanFormatter{}, os.Stdout} - c.Outlets.Add(out, logger.Warn) - return - } - - var syslogOutlets, stdoutOutlets int - for lei, le := range in { - - outlet, minLevel, err := parseOutlet(le) - if err != nil { - return nil, errors.Wrapf(err, "cannot parse outlet #%d", lei) - } - var _ logger.Outlet = WriterOutlet{} - var _ logger.Outlet = &SyslogOutlet{} - switch outlet.(type) { - case *SyslogOutlet: - syslogOutlets++ - case WriterOutlet: - stdoutOutlets++ - } - - c.Outlets.Add(outlet, minLevel) - - } - - if syslogOutlets > 1 { - return nil, errors.Errorf("can only define one 'syslog' outlet") - } - if stdoutOutlets > 1 { - return nil, errors.Errorf("can only define one 'stdout' outlet") - } - - return c, nil - -} - -func parseLogFormat(i interface{}) (f EntryFormatter, err error) { - var is string - switch j := i.(type) { - case string: - is = j - default: - return nil, errors.Errorf("invalid log format: wrong type: %T", i) - } - - switch is { - case "human": - return &HumanFormatter{}, nil - case "logfmt": - return &LogfmtFormatter{}, nil - case "json": - return &JSONFormatter{}, nil - default: - return nil, errors.Errorf("invalid log format: '%s'", is) - } - -} - -func parseOutlet(in config.LoggingOutletEnum) (o logger.Outlet, level logger.Level, err error) { - - parseCommon := func(common config.LoggingOutletCommon) (logger.Level, EntryFormatter, error) { - if common.Level == "" || common.Format == "" { - return 0, nil, errors.Errorf("must specify 'level' and 'format' field") - } - - minLevel, err := logger.ParseLevel(common.Level) - if err != nil { - return 0, nil, errors.Wrap(err, "cannot parse 'level' field") - } - formatter, err := parseLogFormat(common.Format) - if err != nil { - return 0, nil, errors.Wrap(err, "cannot parse 'formatter' field") - } - return minLevel, formatter, nil - } - - var f EntryFormatter - - switch v := in.Ret.(type) { - case config.StdoutLoggingOutlet: - level, f, err = parseCommon(v.LoggingOutletCommon) - if err != nil { - break - } - o, err = parseStdoutOutlet(v, f) - case config.TCPLoggingOutlet: - level, f, err = parseCommon(v.LoggingOutletCommon) - if err != nil { - break - } - o, err = parseTCPOutlet(v, f) - case config.SyslogLoggingOutlet: - level, f, err = parseCommon(v.LoggingOutletCommon) - if err != nil { - break - } - o, err = parseSyslogOutlet(v, f) - default: - panic(v) - } - return o, level, err -} - -func parseStdoutOutlet(in config.StdoutLoggingOutlet, formatter EntryFormatter) (WriterOutlet, error) { - flags := MetadataAll - writer := os.Stdout - if !isatty.IsTerminal(writer.Fd()) && !in.Time { - flags &= ^MetadataTime - } - - formatter.SetMetadataFlags(flags) - return WriterOutlet{ - formatter, - os.Stdout, - }, nil -} - -func parseTCPOutlet(in config.TCPLoggingOutlet, formatter EntryFormatter) (out *TCPOutlet, err error) { - var tlsConfig *tls.Config - if in.TLS != nil { - tlsConfig, err = func(m *config.TCPLoggingOutletTLS, host string) (*tls.Config, error) { - clientCert, err := tls.LoadX509KeyPair(m.Cert, m.Key) - if err != nil { - return nil, errors.Wrap(err, "cannot load client cert") - } - - var rootCAs *x509.CertPool - if m.CA == "" { - if rootCAs, err = x509.SystemCertPool(); err != nil { - return nil, errors.Wrap(err, "cannot open system cert pool") - } - } else { - rootCAs, err = tlsconf.ParseCAFile(m.CA) - if err != nil { - return nil, errors.Wrap(err, "cannot parse CA cert") - } - } - if rootCAs == nil { - panic("invariant violated") - } - - return tlsconf.ClientAuthClient(host, rootCAs, clientCert) - }(in.TLS, in.Address) - if err != nil { - return nil, errors.New("cannot not parse TLS config in field 'tls'") - } - } - - formatter.SetMetadataFlags(MetadataAll) - return NewTCPOutlet(formatter, in.Net, in.Address, tlsConfig, in.RetryInterval), nil - -} - -func parseSyslogOutlet(in config.SyslogLoggingOutlet, formatter EntryFormatter) (out *SyslogOutlet, err error) { - out = &SyslogOutlet{} - out.Formatter = formatter - out.Formatter.SetMetadataFlags(MetadataNone) - out.RetryInterval = in.RetryInterval - return out, nil -} diff --git a/cmd/config_mapfilter.go b/cmd/config_mapfilter.go deleted file mode 100644 index 4c8f0a7..0000000 --- a/cmd/config_mapfilter.go +++ /dev/null @@ -1,274 +0,0 @@ -package cmd - -import ( - "fmt" - "strings" - - "github.com/pkg/errors" - "github.com/zrepl/zrepl/endpoint" - "github.com/zrepl/zrepl/zfs" -) - -type DatasetMapFilter struct { - entries []datasetMapFilterEntry - - // if set, only valid filter entries can be added using Add() - // and Map() will always return an error - filterMode bool -} - -type datasetMapFilterEntry struct { - path *zfs.DatasetPath - // the mapping. since this datastructure acts as both mapping and filter - // we have to convert it to the desired rep dynamically - mapping string - subtreeMatch bool -} - -func NewDatasetMapFilter(capacity int, filterMode bool) *DatasetMapFilter { - return &DatasetMapFilter{ - entries: make([]datasetMapFilterEntry, 0, capacity), - filterMode: filterMode, - } -} - -func (m *DatasetMapFilter) Add(pathPattern, mapping string) (err error) { - - if m.filterMode { - if _, err = m.parseDatasetFilterResult(mapping); err != nil { - return - } - } - - // assert path glob adheres to spec - const SUBTREE_PATTERN string = "<" - patternCount := strings.Count(pathPattern, SUBTREE_PATTERN) - switch { - case patternCount > 1: - case patternCount == 1 && !strings.HasSuffix(pathPattern, SUBTREE_PATTERN): - err = fmt.Errorf("pattern invalid: only one '<' at end of string allowed") - return - } - - pathStr := strings.TrimSuffix(pathPattern, SUBTREE_PATTERN) - path, err := zfs.NewDatasetPath(pathStr) - if err != nil { - return fmt.Errorf("pattern is not a dataset path: %s", err) - } - - entry := datasetMapFilterEntry{ - path: path, - mapping: mapping, - subtreeMatch: patternCount > 0, - } - m.entries = append(m.entries, entry) - return - -} - -// find the most specific prefix mapping we have -// -// longer prefix wins over shorter prefix, direct wins over glob -func (m DatasetMapFilter) mostSpecificPrefixMapping(path *zfs.DatasetPath) (idx int, found bool) { - lcp, lcp_entry_idx := -1, -1 - direct_idx := -1 - for e := range m.entries { - entry := m.entries[e] - ep := m.entries[e].path - lep := ep.Length() - - switch { - case !entry.subtreeMatch && ep.Equal(path): - direct_idx = e - continue - case entry.subtreeMatch && path.HasPrefix(ep) && lep > lcp: - lcp = lep - lcp_entry_idx = e - default: - continue - } - } - - if lcp_entry_idx >= 0 || direct_idx >= 0 { - found = true - switch { - case direct_idx >= 0: - idx = direct_idx - case lcp_entry_idx >= 0: - idx = lcp_entry_idx - } - } - return -} - -// Returns target == nil if there is no mapping -func (m DatasetMapFilter) Map(source *zfs.DatasetPath) (target *zfs.DatasetPath, err error) { - - if m.filterMode { - err = fmt.Errorf("using a filter for mapping simply does not work") - return - } - - mi, hasMapping := m.mostSpecificPrefixMapping(source) - if !hasMapping { - return nil, nil - } - me := m.entries[mi] - - if me.mapping == "" { - // Special case treatment: 'foo/bar<' => '' - if !me.subtreeMatch { - return nil, fmt.Errorf("mapping to '' must be a subtree match") - } - // ok... - } else { - if strings.HasPrefix("!", me.mapping) { - // reject mapping - return nil, nil - } - } - - target, err = zfs.NewDatasetPath(me.mapping) - if err != nil { - err = fmt.Errorf("mapping target is not a dataset path: %s", err) - return - } - if me.subtreeMatch { - // strip common prefix ('<' wildcards are no special case here) - extendComps := source.Copy() - extendComps.TrimPrefix(me.path) - target.Extend(extendComps) - } - return -} - -func (m DatasetMapFilter) Filter(p *zfs.DatasetPath) (pass bool, err error) { - - if !m.filterMode { - err = fmt.Errorf("using a mapping as a filter does not work") - return - } - - mi, hasMapping := m.mostSpecificPrefixMapping(p) - if !hasMapping { - pass = false - return - } - me := m.entries[mi] - pass, err = m.parseDatasetFilterResult(me.mapping) - return -} - -// Construct a new filter-only DatasetMapFilter from a mapping -// The new filter allows excactly those paths that were not forbidden by the mapping. -func (m DatasetMapFilter) InvertedFilter() (inv *DatasetMapFilter, err error) { - - if m.filterMode { - err = errors.Errorf("can only invert mappings") - return - } - - inv = &DatasetMapFilter{ - make([]datasetMapFilterEntry, len(m.entries)), - true, - } - - for i, e := range m.entries { - inv.entries[i].path, err = zfs.NewDatasetPath(e.mapping) - if err != nil { - err = errors.Wrapf(err, "mapping cannot be inverted: '%s' is not a dataset path: %s", e.mapping) - return - } - inv.entries[i].mapping = MapFilterResultOk - inv.entries[i].subtreeMatch = e.subtreeMatch - } - - return inv, nil -} - -// FIXME investigate whether we can support more... -func (m DatasetMapFilter) Invert() (endpoint.FSMap, error) { - - if m.filterMode { - return nil, errors.Errorf("can only invert mappings") - } - - if len(m.entries) != 1 { - return nil, errors.Errorf("inversion of complicated mappings is not implemented") // FIXME - } - - e := m.entries[0] - - inv := &DatasetMapFilter{ - make([]datasetMapFilterEntry, len(m.entries)), - false, - } - mp, err := zfs.NewDatasetPath(e.mapping) - if err != nil { - return nil, err - } - - inv.entries[0] = datasetMapFilterEntry{ - path: mp, - mapping: e.path.ToString(), - subtreeMatch: e.subtreeMatch, - } - - return inv, nil -} - -// Creates a new DatasetMapFilter in filter mode from a mapping -// All accepting mapping results are mapped to accepting filter results -// All rejecting mapping results are mapped to rejecting filter results -func (m DatasetMapFilter) AsFilter() endpoint.FSFilter { - - f := &DatasetMapFilter{ - make([]datasetMapFilterEntry, len(m.entries)), - true, - } - - for i, e := range m.entries { - var newe datasetMapFilterEntry = e - if strings.HasPrefix(newe.mapping, "!") { - newe.mapping = MapFilterResultOmit - } else { - newe.mapping = MapFilterResultOk - } - f.entries[i] = newe - } - - return f -} - -const ( - MapFilterResultOk string = "ok" - MapFilterResultOmit string = "!" -) - -// Parse a dataset filter result -func (m DatasetMapFilter) parseDatasetFilterResult(result string) (pass bool, err error) { - l := strings.ToLower(result) - if l == MapFilterResultOk { - return true, nil - } - if l == MapFilterResultOmit { - return false, nil - } - return false, fmt.Errorf("'%s' is not a valid filter result", result) -} - -func parseDatasetMapFilterFilesystems(in map[string]bool) (f *DatasetMapFilter, err error) { - - f = NewDatasetMapFilter(len(in), true) - for pathPattern, accept := range in { - mapping := MapFilterResultOmit - if accept { - mapping = MapFilterResultOk - } - if err = f.Add(pathPattern, mapping); err != nil { - err = fmt.Errorf("invalid mapping entry ['%s':'%s']: %s", pathPattern, mapping, err) - return - } - } - return -} diff --git a/cmd/config_parse.go b/cmd/config_parse.go deleted file mode 100644 index 3e72072..0000000 --- a/cmd/config_parse.go +++ /dev/null @@ -1,211 +0,0 @@ -package cmd - -import ( - "io/ioutil" - - "fmt" - "github.com/go-yaml/yaml" - "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" - "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/cmd/pruning/retentiongrid" - "github.com/zrepl/zrepl/config" - "os" -) - -var ConfigFileDefaultLocations []string = []string{ - "/etc/zrepl/zrepl.yml", - "/usr/local/etc/zrepl/zrepl.yml", -} - -const ( - JobNameControl string = "control" -) - -var ReservedJobNames []string = []string{ - JobNameControl, -} - -type ConfigParsingContext struct { - Global *Global -} - -func ParseConfig(path string) (config *Config, err error) { - - if path == "" { - // Try default locations - for _, l := range ConfigFileDefaultLocations { - stat, err := os.Stat(l) - if err != nil { - continue - } - if !stat.Mode().IsRegular() { - err = errors.Errorf("file at default location is not a regular file: %s", l) - continue - } - path = l - break - } - } - - var i interface{} - - var bytes []byte - - if bytes, err = ioutil.ReadFile(path); err != nil { - err = errors.WithStack(err) - return - } - - if err = yaml.Unmarshal(bytes, &i); err != nil { - err = errors.WithStack(err) - return - } - - return parseConfig(i) -} - -func parseConfig(i interface{}) (c *Config, err error) { - - var asMap struct { - Global map[string]interface{} - Jobs []map[string]interface{} - } - if err := mapstructure.Decode(i, &asMap); err != nil { - return nil, errors.Wrap(err, "config root must be a dict") - } - - c = &Config{} - - // Parse global with defaults - c.Global.Serve.Stdinserver.SockDir = "/var/run/zrepl/stdinserver" - c.Global.Control.Sockpath = "/var/run/zrepl/control" - - err = mapstructure.Decode(asMap.Global, &c.Global) - if err != nil { - err = errors.Wrap(err, "mapstructure error on 'global' section: %s") - return - } - - if c.Global.logging, err = parseLogging(asMap.Global["logging"]); err != nil { - return nil, errors.Wrap(err, "cannot parse logging section") - } - - cpc := ConfigParsingContext{&c.Global} - jpc := JobParsingContext{cpc} - c.Jobs = make(map[string]Job, len(asMap.Jobs)) - - // FIXME internal jobs should not be mixed with user jobs - // Monitoring Jobs - var monJobs []map[string]interface{} - if err := mapstructure.Decode(asMap.Global["monitoring"], &monJobs); err != nil { - return nil, errors.Wrap(err, "cannot parse monitoring section") - } - for i, jc := range monJobs { - if jc["name"] == "" || jc["name"] == nil { - // FIXME internal jobs should not require a name... - jc["name"] = fmt.Sprintf("prometheus-%d", i) - } - job, err := parseJob(jpc, jc) - if err != nil { - return nil, errors.Wrapf(err, "cannot parse monitoring job #%d", i) - } - if job.JobType() != JobTypePrometheus { - return nil, errors.Errorf("monitoring job #%d has invalid job type", i) - } - c.Jobs[job.JobName()] = job - } - - // Regular Jobs - for i := range asMap.Jobs { - job, err := parseJob(jpc, asMap.Jobs[i]) - if err != nil { - // Try to find its name - namei, ok := asMap.Jobs[i]["name"] - if !ok { - namei = fmt.Sprintf("", i) - } - err = errors.Wrapf(err, "cannot parse job '%v'", namei) - return nil, err - } - jn := job.JobName() - if _, ok := c.Jobs[jn]; ok { - err = errors.Errorf("duplicate or invalid job name: %s", jn) - return nil, err - } - c.Jobs[job.JobName()] = job - } - - return c, nil - -} - -type JobParsingContext struct { - ConfigParsingContext -} - -func parseJob(c config.Global, in config.JobEnum) (j Job, err error) { - - switch v := in.Ret.(type) { - case config.PullJob: - return parsePullJob(c, v) - case config.SourceJob: - return parseSourceJob(c, v) - case config.LocalJob: - return parseLocalJob(c, v) - default: - panic(fmt.Sprintf("implementation error: unknown job type %s", v)) - } - -} - -func parseConnect(in config.ConnectEnum) (c streamrpc.Connecter, err error) { - switch v := in.Ret.(type) { - case config.SSHStdinserverConnect: - return parseSSHStdinserverConnecter(v) - case config.TCPConnect: - return parseTCPConnecter(v) - case config.TLSConnect: - return parseTLSConnecter(v) - default: - panic(fmt.Sprintf("unknown connect type %v", v)) - } -} - -func parsePruning(in []config.PruningEnum, willSeeBookmarks bool) (p Pruner, err error) { - - policies := make([]PrunePolicy, len(in)) - for i := range in { - if policies[i], err = parseKeepRule(in[i]); err != nil { - return nil, errors.Wrapf(err, "invalid keep rule #%d:", i) - } - } - -} - -func parseKeepRule(in config.PruningEnum) (p PrunePolicy, err error) { - switch v := in.Ret.(type) { - case config.PruneGrid: - return retentiongrid.ParseGridPrunePolicy(v, willSeeBookmarks) - //case config.PruneKeepLastN: - //case config.PruneKeepRegex: - //case config.PruneKeepNotReplicated: - default: - panic(fmt.Sprintf("unknown keep rule type %v", v)) - } -} - -func parseAuthenticatedChannelListenerFactory(c config.Global, in config.ServeEnum) (p ListenerFactory, err error) { - - switch v := in.Ret.(type) { - case config.StdinserverServer: - return parseStdinserverListenerFactory(c, v) - case config.TCPServe: - return parseTCPListenerFactory(c, v) - case config.TLSServe: - return parseTLSListenerFactory(c, v) - default: - panic(fmt.Sprintf("unknown listener type %v", v)) - } - -} diff --git a/cmd/config_prune_none.go b/cmd/config_prune_none.go deleted file mode 100644 index 785941b..0000000 --- a/cmd/config_prune_none.go +++ /dev/null @@ -1,11 +0,0 @@ -package cmd - -import "github.com/zrepl/zrepl/zfs" - -type NoPrunePolicy struct{} - -func (p NoPrunePolicy) Prune(fs *zfs.DatasetPath, versions []zfs.FilesystemVersion) (keep, remove []zfs.FilesystemVersion, err error) { - keep = versions - remove = []zfs.FilesystemVersion{} - return -} diff --git a/cmd/config_serve_stdinserver.go b/cmd/config_serve_stdinserver.go deleted file mode 100644 index 1fef93c..0000000 --- a/cmd/config_serve_stdinserver.go +++ /dev/null @@ -1,58 +0,0 @@ -package cmd - -import ( - "github.com/problame/go-netssh" - "github.com/zrepl/zrepl/cmd/helpers" - "github.com/zrepl/zrepl/config" - "net" - "path" -) - -type StdinserverListenerFactory struct { - ClientIdentity string - sockpath string -} - -func parseStdinserverListenerFactory(c config.Global, in config.StdinserverServer) (f *StdinserverListenerFactory, err error) { - - f = &StdinserverListenerFactory{ - ClientIdentity: in.ClientIdentity, - } - - f.sockpath = path.Join(c.Serve.StdinServer.SockDir, f.ClientIdentity) - - return -} - -func (f *StdinserverListenerFactory) Listen() (net.Listener, error) { - - if err := helpers.PreparePrivateSockpath(f.sockpath); err != nil { - return nil, err - } - - l, err := netssh.Listen(f.sockpath) - if err != nil { - return nil, err - } - return StdinserverListener{l}, nil -} - -type StdinserverListener struct { - l *netssh.Listener -} - -func (l StdinserverListener) Addr() net.Addr { - return netsshAddr{} -} - -func (l StdinserverListener) Accept() (net.Conn, error) { - c, err := l.l.Accept() - if err != nil { - return nil, err - } - return netsshConnToNetConnAdatper{c}, nil -} - -func (l StdinserverListener) Close() (err error) { - return l.l.Close() -} diff --git a/cmd/config_serve_tcp.go b/cmd/config_serve_tcp.go deleted file mode 100644 index 7afafd6..0000000 --- a/cmd/config_serve_tcp.go +++ /dev/null @@ -1,25 +0,0 @@ -package cmd - -import ( - "github.com/zrepl/zrepl/config" - "net" - "time" -) - -type TCPListenerFactory struct { - Address string -} - -func parseTCPListenerFactory(c config.Global, in config.TCPServe) (*TCPListenerFactory, error) { - - lf := &TCPListenerFactory{ - Address: in.Listen, - } - return lf, nil -} - -var TCPListenerHandshakeTimeout = 10 * time.Second // FIXME make configurable - -func (f *TCPListenerFactory) Listen() (net.Listener, error) { - return net.Listen("tcp", f.Address) -} diff --git a/cmd/config_serve_tls.go b/cmd/config_serve_tls.go deleted file mode 100644 index 83dcf16..0000000 --- a/cmd/config_serve_tls.go +++ /dev/null @@ -1,78 +0,0 @@ -package cmd - -import ( - "crypto/tls" - "crypto/x509" - "net" - "time" - - "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" - "github.com/zrepl/zrepl/cmd/tlsconf" - "github.com/zrepl/zrepl/config" -) - -type TCPListenerFactory struct { - Address string - tls bool - clientCA *x509.CertPool - serverCert tls.Certificate - clientCommonName string -} - -func parseTCPListenerFactory(c config.Global, in config.TCPServe) (*TCPListenerFactory, error) { - - lf := &TCPListenerFactory{ - Address: in.Listen, - } - - if in.TLS != nil { - err := func(i map[string]interface{}) (err error) { - var in struct { - CA string - Cert string - Key string - ClientCN string `mapstructure:"client_cn"` - } - if err := mapstructure.Decode(i, &in); err != nil { - return errors.Wrap(err, "mapstructure error") - } - - if in.CA == "" || in.Cert == "" || in.Key == "" || in.ClientCN == "" { - return errors.New("fields 'ca', 'cert', 'key' and 'client_cn' must be specified") - } - - lf.clientCommonName = in.ClientCN - - lf.clientCA, err = tlsconf.ParseCAFile(in.CA) - if err != nil { - return errors.Wrap(err, "cannot parse ca file") - } - - lf.serverCert, err = tls.LoadX509KeyPair(in.Cert, in.Key) - if err != nil { - return errors.Wrap(err, "cannot parse cer/key pair") - } - - lf.tls = true // mark success - return nil - }(in.TLS) - if err != nil { - return nil, errors.Wrap(err, "error parsing TLS config in field 'tls'") - } - } - - return lf, nil -} - -var TCPListenerHandshakeTimeout = 10 * time.Second // FIXME make configurable - -func (f *TCPListenerFactory) Listen() (net.Listener, error) { - l, err := net.Listen("tcp", f.Address) - if !f.tls || err != nil { - return l, err - } - - tl := tlsconf.NewClientAuthListener(l, f.clientCA, f.serverCert, f.clientCommonName, TCPListenerHandshakeTimeout) - return tl, nil -} diff --git a/cmd/config_test.go b/cmd/config_test.go deleted file mode 100644 index f53453a..0000000 --- a/cmd/config_test.go +++ /dev/null @@ -1,283 +0,0 @@ -package cmd - -import ( - "testing" - "time" - - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/zrepl/zrepl/util" - "github.com/zrepl/zrepl/zfs" -) - -func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) { - - paths := []string{ - "./sampleconf/localbackup/host1.yml", - "./sampleconf/pullbackup/backuphost.yml", - "./sampleconf/pullbackup/productionhost.yml", - "./sampleconf/random/debugging.yml", - "./sampleconf/random/logging_and_monitoring.yml", - } - - for _, p := range paths { - - c, err := ParseConfig(p) - if err != nil { - t.Errorf("error parsing %s:\n%+v", p, err) - } - - t.Logf("file: %s", p) - t.Log(pretty.Sprint(c)) - - } - -} - -func TestParseRetentionGridStringParsing(t *testing.T) { - - intervals, err := parseRetentionGridIntervalsString("2x10m(keep=2) | 1x1h | 3x1w") - - assert.Nil(t, err) - assert.Len(t, intervals, 6) - proto := util.RetentionInterval{ - KeepCount: 2, - Length: 10 * time.Minute, - } - assert.EqualValues(t, proto, intervals[0]) - assert.EqualValues(t, proto, intervals[1]) - - proto.KeepCount = 1 - proto.Length = 1 * time.Hour - assert.EqualValues(t, proto, intervals[2]) - - proto.Length = 7 * 24 * time.Hour - assert.EqualValues(t, proto, intervals[3]) - assert.EqualValues(t, proto, intervals[4]) - assert.EqualValues(t, proto, intervals[5]) - - intervals, err = parseRetentionGridIntervalsString("|") - assert.Error(t, err) - intervals, err = parseRetentionGridIntervalsString("2x10m") - assert.NoError(t, err) - - intervals, err = parseRetentionGridIntervalsString("1x10m(keep=all)") - assert.NoError(t, err) - assert.Len(t, intervals, 1) - assert.EqualValues(t, util.RetentionGridKeepCountAll, intervals[0].KeepCount) - -} - -func TestDatasetMapFilter(t *testing.T) { - - expectMapping := func(m map[string]string, from, to string) { - dmf, err := parseDatasetMapFilter(m, false) - if err != nil { - t.Logf("expect test map to be valid: %s", err) - t.FailNow() - } - fromPath, err := zfs.NewDatasetPath(from) - if err != nil { - t.Logf("expect test from path to be valid: %s", err) - t.FailNow() - } - - res, err := dmf.Map(fromPath) - if to == "" { - assert.Nil(t, res) - assert.Nil(t, err) - t.Logf("%s => NOT MAPPED", fromPath.ToString()) - return - } - - assert.Nil(t, err) - toPath, err := zfs.NewDatasetPath(to) - if err != nil { - t.Logf("expect test to path to be valid: %s", err) - t.FailNow() - } - assert.True(t, res.Equal(toPath)) - } - - expectFilter := func(m map[string]string, path string, pass bool) { - dmf, err := parseDatasetMapFilter(m, true) - if err != nil { - t.Logf("expect test filter to be valid: %s", err) - t.FailNow() - } - p, err := zfs.NewDatasetPath(path) - if err != nil { - t.Logf("expect test path to be valid: %s", err) - t.FailNow() - } - res, err := dmf.Filter(p) - assert.Nil(t, err) - assert.Equal(t, pass, res) - } - - map1 := map[string]string{ - "a/b/c<": "root1", - "a/b<": "root2", - "<": "root3/b/c", - "b": "!", - "a/b/c/d/e<": "!", - "q<": "root4/1/2", - } - - expectMapping(map1, "a/b/c", "root1") - expectMapping(map1, "a/b/c/d", "root1/d") - expectMapping(map1, "a/b/c/d/e", "") - expectMapping(map1, "a/b/e", "root2/e") - expectMapping(map1, "a/b", "root2") - expectMapping(map1, "x", "root3/b/c/x") - expectMapping(map1, "x/y", "root3/b/c/x/y") - expectMapping(map1, "q", "root4/1/2") - expectMapping(map1, "b", "") - expectMapping(map1, "q/r", "root4/1/2/r") - - map2 := map[string]string{ // identity mapping - "<": "", - } - expectMapping(map2, "foo/bar", "foo/bar") - - map3 := map[string]string{ // subtree to local mapping, need that for Invert() - "foo/bar<": "", - } - { - m, _ := parseDatasetMapFilter(map3, false) - p, _ := zfs.NewDatasetPath("foo/bar") - tp, err := m.Map(p) - assert.Nil(t, err) - assert.True(t, tp.Empty()) - - expectMapping(map3, "foo/bar/x", "x") - expectMapping(map3, "x", "") - } - - filter1 := map[string]string{ - "<": "!", - "a<": "ok", - "a/b<": "!", - } - - expectFilter(filter1, "b", false) - expectFilter(filter1, "a", true) - expectFilter(filter1, "a/d", true) - expectFilter(filter1, "a/b", false) - expectFilter(filter1, "a/b/c", false) - - filter2 := map[string]string{} - expectFilter(filter2, "foo", false) // default to omit - -} - -func TestDatasetMapFilter_AsFilter(t *testing.T) { - - mapspec := map[string]string{ - "a/b/c<": "root1", - "a/b<": "root2", - "<": "root3/b/c", - "b": "!", - "a/b/c/d/e<": "!", - "q<": "root4/1/2", - } - - m, err := parseDatasetMapFilter(mapspec, false) - assert.Nil(t, err) - - f := m.AsFilter() - - t.Logf("Mapping:\n%s\nFilter:\n%s", pretty.Sprint(m), pretty.Sprint(f)) - - tf := func(f zfs.DatasetFilter, path string, pass bool) { - p, err := zfs.NewDatasetPath(path) - assert.Nil(t, err) - r, err := f.Filter(p) - assert.Nil(t, err) - assert.Equal(t, pass, r) - } - - tf(f, "a/b/c", true) - tf(f, "a/b", true) - tf(f, "b", false) - tf(f, "a/b/c/d/e", false) - tf(f, "a/b/c/d/e/f", false) - tf(f, "a", true) - -} - -func TestDatasetMapFilter_InvertedFilter(t *testing.T) { - mapspec := map[string]string{ - "a/b": "1/2", - "a/b/c<": "3", - "a/b/c/d<": "1/2/a", - "a/b/d": "!", - } - - m, err := parseDatasetMapFilter(mapspec, false) - assert.Nil(t, err) - - inv, err := m.InvertedFilter() - assert.Nil(t, err) - - t.Log(pretty.Sprint(inv)) - - expectMapping := func(m *DatasetMapFilter, ps string, expRes bool) { - p, err := zfs.NewDatasetPath(ps) - assert.Nil(t, err) - r, err := m.Filter(p) - assert.Nil(t, err) - assert.Equal(t, expRes, r) - } - - expectMapping(inv, "4", false) - expectMapping(inv, "3", true) - expectMapping(inv, "3/x", true) - expectMapping(inv, "1", false) - expectMapping(inv, "1/2", true) - expectMapping(inv, "1/2/3", false) - expectMapping(inv, "1/2/a/b", true) - -} - -func TestDatasetMapFilter_Invert(t *testing.T) { - - mapspec := map[string]string{ - "<": "foo/bar", - } - - m, err := parseDatasetMapFilter(mapspec, false) - assert.NoError(t, err) - - invI, err := m.Invert() - assert.NoError(t, err) - inv, ok := invI.(*DatasetMapFilter) - assert.True(t, ok) - - expectMapping := func(m *DatasetMapFilter, input, expect string, expErr bool, expEmpty bool) { - p, err := zfs.NewDatasetPath(input) - assert.Nil(t, err) - r, err := m.Map(p) - if expErr { - assert.Nil(t, r) - assert.Error(t, err) - return - } - if expEmpty { - assert.Nil(t, err) - assert.True(t, r.Empty()) - } else if expect == "" { - assert.Nil(t, r) - assert.Nil(t, err) - } else { - assert.Nil(t, err) - assert.NotNil(t, r) - assert.Equal(t, expect, r.ToString()) - } - } - - expectMapping(inv, "x", "", false, false) - expectMapping(inv, "foo/bar", "", false, true) - expectMapping(inv, "foo/bar/bee", "bee", false, false) - -} diff --git a/cmd/control.go b/cmd/control.go deleted file mode 100644 index 77daa44..0000000 --- a/cmd/control.go +++ /dev/null @@ -1,156 +0,0 @@ -package cmd - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "github.com/pkg/errors" - "github.com/spf13/cobra" - "github.com/zrepl/zrepl/cmd/daemon" - "github.com/zrepl/zrepl/logger" - "github.com/zrepl/zrepl/version" - "io" - golog "log" - "net" - "net/http" - "os" -) - -var controlCmd = &cobra.Command{ - Use: "control", - Short: "control zrepl daemon", -} - -var pprofCmd = &cobra.Command{ - Use: "pprof off | [on TCP_LISTEN_ADDRESS]", - Short: "start a http server exposing go-tool-compatible profiling endpoints at TCP_LISTEN_ADDRESS", - Run: doControlPProf, - PreRunE: func(cmd *cobra.Command, args []string) error { - if cmd.Flags().NArg() < 1 { - goto enargs - } - switch cmd.Flags().Arg(0) { - case "on": - pprofCmdArgs.msg.Run = true - if cmd.Flags().NArg() != 2 { - return errors.New("must specify TCP_LISTEN_ADDRESS as second positional argument") - } - pprofCmdArgs.msg.HttpListenAddress = cmd.Flags().Arg(1) - case "off": - if cmd.Flags().NArg() != 1 { - goto enargs - } - pprofCmdArgs.msg.Run = false - } - return nil - enargs: - return errors.New("invalid number of positional arguments") - - }, -} -var pprofCmdArgs struct { - msg daemon.PprofServerControlMsg -} - -var controlVersionCmd = &cobra.Command{ - Use: "version", - Short: "print version of running zrepl daemon", - Run: doControLVersionCmd, -} - -var controlStatusCmdArgs struct { - format string - level logger.Level - onlyShowJob string -} - -func init() { - RootCmd.AddCommand(controlCmd) - controlCmd.AddCommand(pprofCmd) - controlCmd.AddCommand(controlVersionCmd) - controlStatusCmdArgs.level = logger.Warn -} - -func controlHttpClient() (client http.Client, err error) { - - conf, err := ParseConfig(rootArgs.configFile) - if err != nil { - return http.Client{}, err - } - - return http.Client{ - Transport: &http.Transport{ - DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { - return net.Dial("unix", conf.Global.Control.Sockpath) - }, - }, - }, nil -} - -func doControlPProf(cmd *cobra.Command, args []string) { - - log := golog.New(os.Stderr, "", 0) - - die := func() { - log.Printf("exiting after error") - os.Exit(1) - } - - log.Printf("connecting to zrepl daemon") - httpc, err := controlHttpClient() - if err != nil { - log.Printf("error parsing config: %s", err) - die() - } - - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(&pprofCmdArgs.msg); err != nil { - log.Printf("error marshaling request: %s", err) - die() - } - _, err = httpc.Post("http://unix"+daemon.ControlJobEndpointPProf, "application/json", &buf) - if err != nil { - log.Printf("error: %s", err) - die() - } - - log.Printf("finished") -} - -func doControLVersionCmd(cmd *cobra.Command, args []string) { - - log := golog.New(os.Stderr, "", 0) - - die := func() { - log.Printf("exiting after error") - os.Exit(1) - } - - httpc, err := controlHttpClient() - if err != nil { - log.Printf("could not connect to daemon: %s", err) - die() - } - - resp, err := httpc.Get("http://unix" + daemon.ControlJobEndpointVersion) - if err != nil { - log.Printf("error: %s", err) - die() - } else if resp.StatusCode != http.StatusOK { - var msg bytes.Buffer - io.CopyN(&msg, resp.Body, 4096) - log.Printf("error: %s", msg.String()) - die() - } - - var info version.ZreplVersionInformation - err = json.NewDecoder(resp.Body).Decode(&info) - if err != nil { - log.Printf("error unmarshaling response: %s", err) - die() - } - - fmt.Println(info.String()) - -} diff --git a/cmd/daemon.deact/control.go b/cmd/daemon.deact/control.go deleted file mode 100644 index 0aff1ce..0000000 --- a/cmd/daemon.deact/control.go +++ /dev/null @@ -1,142 +0,0 @@ -package daemon - -import ( - "bytes" - "context" - "encoding/json" - "github.com/pkg/errors" - "github.com/zrepl/zrepl/cmd/daemon/job" - "github.com/zrepl/zrepl/cmd/helpers" - "github.com/zrepl/zrepl/logger" - "github.com/zrepl/zrepl/version" - "io" - "net" - "net/http" -) - -type controlJob struct { - sockaddr *net.UnixAddr - jobs *jobs -} - -func newControlJob(sockpath string, jobs *jobs) (j *controlJob, err error) { - j = &controlJob{jobs: jobs} - - j.sockaddr, err = net.ResolveUnixAddr("unix", sockpath) - if err != nil { - err = errors.Wrap(err, "cannot resolve unix address") - return - } - - return -} - -func (j *controlJob) Name() string { return jobNameControl } - -func (j *controlJob) Status() interface{} { return nil } - -const ( - ControlJobEndpointPProf string = "/debug/pprof" - ControlJobEndpointVersion string = "/version" - ControlJobEndpointStatus string = "/status" -) - -func (j *controlJob) Run(ctx context.Context) { - - log := job.GetLogger(ctx) - defer log.Info("control job finished") - - l, err := helpers.ListenUnixPrivate(j.sockaddr) - if err != nil { - log.WithError(err).Error("error listening") - return - } - - pprofServer := NewPProfServer(ctx) - - mux := http.NewServeMux() - mux.Handle(ControlJobEndpointPProf, requestLogger{log: log, handlerFunc: func(w http.ResponseWriter, r *http.Request) { - var msg PprofServerControlMsg - err := json.NewDecoder(r.Body).Decode(&msg) - if err != nil { - log.WithError(err).Error("bad pprof request from client") - w.WriteHeader(http.StatusBadRequest) - } - pprofServer.Control(msg) - w.WriteHeader(200) - }}) - mux.Handle(ControlJobEndpointVersion, - requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { - return version.NewZreplVersionInformation(), nil - }}}) - mux.Handle(ControlJobEndpointStatus, - requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { - s := j.jobs.status() - return s, nil - }}}) - server := http.Server{Handler: mux} - -outer: - for { - - served := make(chan error) - go func() { - served <- server.Serve(l) - close(served) - }() - - select { - case <-ctx.Done(): - log.WithError(ctx.Err()).Info("context done") - server.Shutdown(context.Background()) - break outer - case err = <-served: - if err != nil { - log.WithError(err).Error("error serving") - break outer - } - } - - } - -} - -type jsonResponder struct { - producer func() (interface{}, error) -} - -func (j jsonResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) { - res, err := j.producer() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - io.WriteString(w, err.Error()) - return - } - var buf bytes.Buffer - err = json.NewEncoder(&buf).Encode(res) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - io.WriteString(w, err.Error()) - } else { - io.Copy(w, &buf) - } -} - -type requestLogger struct { - log logger.Logger - handler http.Handler - handlerFunc http.HandlerFunc -} - -func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) { - log := l.log.WithField("method", r.Method).WithField("url", r.URL) - log.Info("start") - if l.handlerFunc != nil { - l.handlerFunc(w, r) - } else if l.handler != nil { - l.handler.ServeHTTP(w, r) - } else { - log.Error("no handler or handlerFunc configured") - } - log.Info("finish") -} diff --git a/cmd/daemon.deact/daemon.go b/cmd/daemon.deact/daemon.go deleted file mode 100644 index 070d4fe..0000000 --- a/cmd/daemon.deact/daemon.go +++ /dev/null @@ -1,174 +0,0 @@ -package daemon - -import ( - "context" - "github.com/zrepl/zrepl/logger" - "os" - "os/signal" - "syscall" - "time" - "github.com/zrepl/zrepl/version" - "fmt" -) - -.daesdfadsfsafjlsjfda - -import ( - "context" - "fmt" - "github.com/zrepl/zrepl/cmd/daemon/job" - "github.com/zrepl/zrepl/logger" - "github.com/zrepl/zrepl/version" - "os" - "os/signal" - "strings" - "sync" - "syscall" - "time" -) - -func Run(ctx context.Context, controlSockpath string, outlets *logger.Outlets, confJobs []job.Job) { - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-sigChan - cancel() - }() - - log := logger.NewLogger(outlets, 1*time.Second) - log.Info(version.NewZreplVersionInformation().String()) - - // parse config - for _, job := range confJobs { - if IsInternalJobName(job.Name()) { - panic(fmt.Sprintf("internal job name used for config job '%s'", job.Name())) //FIXME - } - } - - ctx = job.WithLogger(ctx, log) - - jobs := newJobs() - - // start control socket - controlJob, err := newControlJob(controlSockpath, jobs) - if err != nil { - panic(err) // FIXME - } - jobs.start(ctx, controlJob, true) - - // start prometheus - //var promJob *prometheusJob // FIXME - //jobs.start(ctx, promJob, true) - - log.Info("starting daemon") - - // start regular jobs - for _, j := range confJobs { - jobs.start(ctx, j, false) - } - - select { - case <-jobs.wait(): - log.Info("all jobs finished") - case <-ctx.Done(): - log.WithError(ctx.Err()).Info("context finished") - } - log.Info("daemon exiting") -} - -type jobs struct { - wg sync.WaitGroup - - // m protects all fields below it - m sync.RWMutex - wakeups map[string]job.WakeupChan // by JobName - jobs map[string]job.Job -} - -func newJobs() *jobs { - return &jobs{ - wakeups: make(map[string]job.WakeupChan), - jobs: make(map[string]job.Job), - } -} - -const ( - logJobField string = "job" - logTaskField string = "task" - logSubsysField string = "subsystem" -) - -func (s *jobs) wait() <-chan struct{} { - ch := make(chan struct{}) - go func() { - s.wg.Wait() - }() - return ch -} - -func (s *jobs) status() map[string]interface{} { - s.m.RLock() - defer s.m.RUnlock() - - type res struct { - name string - status interface{} - } - var wg sync.WaitGroup - c := make(chan res, len(s.jobs)) - for name, j := range s.jobs { - wg.Add(1) - go func(name string, j job.Job) { - defer wg.Done() - c <- res{name: name, status: j.Status()} - }(name, j) - } - wg.Wait() - close(c) - ret := make(map[string]interface{}, len(s.jobs)) - for res := range c { - ret[res.name] = res.status - } - return ret -} - -const ( - jobNamePrometheus = "_prometheus" - jobNameControl = "_control" -) - -func IsInternalJobName(s string) bool { - return strings.HasPrefix(s, "_") -} - -func (s *jobs) start(ctx context.Context, j job.Job, internal bool) { - s.m.Lock() - defer s.m.Unlock() - - jobLog := job.GetLogger(ctx).WithField(logJobField, j.Name()) - jobName := j.Name() - if !internal && IsInternalJobName(jobName) { - panic(fmt.Sprintf("internal job name used for non-internal job %s", jobName)) - } - if internal && !IsInternalJobName(jobName) { - panic(fmt.Sprintf("internal job does not use internal job name %s", jobName)) - } - if _, ok := s.jobs[jobName]; ok { - panic(fmt.Sprintf("duplicate job name %s", jobName)) - } - s.jobs[jobName] = j - ctx = job.WithLogger(ctx, jobLog) - ctx, wakeupChan := job.WithWakeup(ctx) - s.wakeups[jobName] = wakeupChan - - s.wg.Add(1) - go func() { - defer s.wg.Done() - jobLog.Info("starting job") - defer jobLog.Info("job exited") - j.Run(ctx) - }() -} diff --git a/cmd/daemon.deact/job/job.go b/cmd/daemon.deact/job/job.go deleted file mode 100644 index 56e25af..0000000 --- a/cmd/daemon.deact/job/job.go +++ /dev/null @@ -1,47 +0,0 @@ -package job - -import ( - "context" - "github.com/zrepl/zrepl/logger" -) - -type Logger = logger.Logger - -type contextKey int - -const ( - contextKeyLog contextKey = iota - contextKeyWakeup -) - -func GetLogger(ctx context.Context) Logger { - if l, ok := ctx.Value(contextKeyLog).(Logger); ok { - return l - } - return logger.NewNullLogger() -} - -func WithLogger(ctx context.Context, l Logger) context.Context { - return context.WithValue(ctx, contextKeyLog, l) -} - -func WithWakeup(ctx context.Context) (context.Context, WakeupChan) { - wc := make(chan struct{}, 1) - return context.WithValue(ctx, contextKeyWakeup, wc), wc -} - -type Job interface { - Name() string - Run(ctx context.Context) - Status() interface{} -} - -type WakeupChan <-chan struct{} - -func WaitWakeup(ctx context.Context) WakeupChan { - wc, ok := ctx.Value(contextKeyWakeup).(WakeupChan) - if !ok { - wc = make(chan struct{}) - } - return wc -} diff --git a/cmd/daemon.deact/pprof.go b/cmd/daemon.deact/pprof.go deleted file mode 100644 index 2296ebd..0000000 --- a/cmd/daemon.deact/pprof.go +++ /dev/null @@ -1,80 +0,0 @@ -package daemon - -import ( - "net/http" - // FIXME: importing this package has the side-effect of poisoning the http.DefaultServeMux - // FIXME: with the /debug/pprof endpoints - "context" - "net" - "net/http/pprof" -) - -type PProfServer struct { - cc chan PprofServerControlMsg - state PprofServerControlMsg - listener net.Listener -} - -type PprofServerControlMsg struct { - // Whether the server should listen for requests on the given address - Run bool - // Must be set if Run is true, undefined otherwise - HttpListenAddress string -} - -func NewPProfServer(ctx context.Context) *PProfServer { - - s := &PProfServer{ - cc: make(chan PprofServerControlMsg), - } - - go s.controlLoop(ctx) - return s -} - -func (s *PProfServer) controlLoop(ctx context.Context) { -outer: - for { - - var msg PprofServerControlMsg - select { - case <-ctx.Done(): - if s.listener != nil { - s.listener.Close() - } - break outer - case msg = <-s.cc: - // proceed - } - - var err error - if msg.Run && s.listener == nil { - - s.listener, err = net.Listen("tcp", msg.HttpListenAddress) - if err != nil { - s.listener = nil - continue - } - - // FIXME: because net/http/pprof does not provide a mux, - mux := http.NewServeMux() - mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) - mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) - mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) - mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) - mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) - go http.Serve(s.listener, mux) - continue - } - - if !msg.Run && s.listener != nil { - s.listener.Close() - s.listener = nil - continue - } - } -} - -func (s *PProfServer) Control(msg PprofServerControlMsg) { - s.cc <- msg -} diff --git a/cmd/daemon.deact/prometheus.go b/cmd/daemon.deact/prometheus.go deleted file mode 100644 index 1cef3d0..0000000 --- a/cmd/daemon.deact/prometheus.go +++ /dev/null @@ -1,82 +0,0 @@ -package daemon - -import ( - "context" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/zrepl/zrepl/cmd/daemon/job" - "github.com/zrepl/zrepl/zfs" - "net" - "net/http" -) - -type prometheusJob struct { - listen string -} - -func newPrometheusJob(listen string) *prometheusJob { - return &prometheusJob{listen} -} - -var prom struct { - taskLastActiveStart *prometheus.GaugeVec - taskLastActiveDuration *prometheus.GaugeVec - taskLogEntries *prometheus.CounterVec -} - -func init() { - prom.taskLastActiveStart = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "zrepl", - Subsystem: "daemon", - Name: "task_last_active_start", - Help: "point in time at which the job task last left idle state", - }, []string{"zrepl_job", "job_type", "task"}) - prom.taskLastActiveDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "zrepl", - Subsystem: "daemon", - Name: "task_last_active_duration", - Help: "seconds that the last run ob a job task spent between leaving and re-entering idle state", - }, []string{"zrepl_job", "job_type", "task"}) - prom.taskLogEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "zrepl", - Subsystem: "daemon", - Name: "task_log_entries", - Help: "number of log entries per job task and level", - }, []string{"zrepl_job", "job_type", "task", "level"}) - prometheus.MustRegister(prom.taskLastActiveStart) - prometheus.MustRegister(prom.taskLastActiveDuration) - prometheus.MustRegister(prom.taskLogEntries) -} - -func (j *prometheusJob) Name() string { return jobNamePrometheus } - -func (j *prometheusJob) Status() interface{} { return nil } - -func (j *prometheusJob) Run(ctx context.Context) { - - if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil { - panic(err) - } - - log := job.GetLogger(ctx) - - l, err := net.Listen("tcp", j.listen) - if err != nil { - log.WithError(err).Error("cannot listen") - } - go func() { - select { - case <-ctx.Done(): - l.Close() - } - }() - - mux := http.NewServeMux() - mux.Handle("/metrics", promhttp.Handler()) - - err = http.Serve(l, mux) - if err != nil { - log.WithError(err).Error("error while serving") - } - -} diff --git a/cmd/daemon.go b/cmd/daemon.go deleted file mode 100644 index 4d4cb1b..0000000 --- a/cmd/daemon.go +++ /dev/null @@ -1,176 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "github.com/spf13/cobra" - "github.com/zrepl/zrepl/cmd/daemon" - "github.com/zrepl/zrepl/cmd/daemon/job" - "github.com/zrepl/zrepl/config" - "github.com/zrepl/zrepl/logger" - "os" - "os/signal" - "syscall" - "time" -) - -// daemonCmd represents the daemon command -var daemonCmd = &cobra.Command{ - Use: "daemon", - Short: "start daemon", - Run: doDaemon, -} - -func init() { - RootCmd.AddCommand(daemonCmd) -} - -type Job interface { - JobName() string - JobType() JobType - JobStart(ctxt context.Context) -} - -type JobType string - -const ( - JobTypePull JobType = "pull" - JobTypeSource JobType = "source" - JobTypeLocal JobType = "local" - JobTypePrometheus JobType = "prometheus" - JobTypeControl JobType = "control" -) - -func ParseUserJobType(s string) (JobType, error) { - switch s { - case "pull": - return JobTypePull, nil - case "source": - return JobTypeSource, nil - case "local": - return JobTypeLocal, nil - case "prometheus": - return JobTypePrometheus, nil - } - return "", fmt.Errorf("unknown job type '%s'", s) -} - -func (j JobType) String() string { - return string(j) -} - -type daemonJobAdaptor struct { - j Job -} - -func (a daemonJobAdaptor) Name() string { - return a.j.JobName() -} - -func (a daemonJobAdaptor) Run(ctx context.Context) { - a.j.JobStart(ctx) -} - -func (a daemonJobAdaptor) Status() interface{} { return nil } - -func doDaemon(cmd *cobra.Command, args []string) { - - conf, err := config.ParseConfig(rootArgs.configFile) - if err != nil { - fmt.Fprintf(os.Stderr, "error parsing config: %s\n", err) - os.Exit(1) - } - - outlets, err := parseLogging(conf.Global.Logging) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to generate logger: %s\n", err) - return - } - log := logger.NewLogger(outlets.Outlets, 1*time.Second) - - ctx := WithLogger(context.Background(), log) - - daemonJobs := make([]job.Job, 0, len(conf.Jobs)) - for i := range conf.Jobs { - parseJob() - daemonJobs = append(daemonJobs, daemonJobAdaptor{conf.Jobs[i]}) - } - daemon.Run(ctx, conf.Global.Control.Sockpath, conf.Global.logging.Outlets, daemonJobs) - -} - -type contextKey string - -const ( - contextKeyLog contextKey = contextKey("log") - contextKeyDaemon contextKey = contextKey("daemon") -) - -func getLogger(ctx context.Context) Logger { - return ctx.Value(contextKeyLog).(Logger) -} - -func WithLogger(ctx context.Context, l Logger) context.Context { - return context.WithValue(ctx, contextKeyLog, l) -} - -type Daemon struct { - conf *Config - startedAt time.Time -} - -func NewDaemon(initialConf *Config) *Daemon { - return &Daemon{conf: initialConf} -} - -func (d *Daemon) Loop(ctx context.Context) { - - d.startedAt = time.Now() - - log := getLogger(ctx) - - ctx, cancel := context.WithCancel(ctx) - ctx = context.WithValue(ctx, contextKeyDaemon, d) - - sigChan := make(chan os.Signal, 1) - finishs := make(chan Job) - - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - log.Info("starting jobs from config") - i := 0 - for _, job := range d.conf.Jobs { - logger := log.WithField(logJobField, job.JobName()) - logger.Info("starting") - i++ - jobCtx := WithLogger(ctx, logger) - go func(j Job) { - j.JobStart(jobCtx) - finishs <- j - }(job) - } - - finishCount := 0 -outer: - for { - select { - case <-finishs: - finishCount++ - if finishCount == len(d.conf.Jobs) { - log.Info("all jobs finished") - break outer - } - - case sig := <-sigChan: - log.WithField("signal", sig).Info("received signal") - log.Info("cancelling all jobs") - cancel() - } - } - - signal.Stop(sigChan) - cancel() // make go vet happy - - log.Info("exiting") - -} diff --git a/cmd/logging_formatters.go b/cmd/logging_formatters.go deleted file mode 100644 index 49ef88b..0000000 --- a/cmd/logging_formatters.go +++ /dev/null @@ -1,201 +0,0 @@ -package cmd - -import ( - "bytes" - "encoding/json" - "fmt" - "github.com/go-logfmt/logfmt" - "github.com/pkg/errors" - "github.com/zrepl/zrepl/logger" - "time" -) - -type EntryFormatter interface { - SetMetadataFlags(flags MetadataFlags) - Format(e *logger.Entry) ([]byte, error) -} - -const ( - FieldLevel = "level" - FieldMessage = "msg" - FieldTime = "time" -) - -const ( - logJobField string = "job" - logTaskField string = "task" - logSubsysField string = "subsystem" -) - -type NoFormatter struct{} - -func (f NoFormatter) SetMetadataFlags(flags MetadataFlags) {} - -func (f NoFormatter) Format(e *logger.Entry) ([]byte, error) { - return []byte(e.Message), nil -} - -type HumanFormatter struct { - metadataFlags MetadataFlags - ignoreFields map[string]bool -} - -const HumanFormatterDateFormat = time.RFC3339 - -func (f *HumanFormatter) SetMetadataFlags(flags MetadataFlags) { - f.metadataFlags = flags -} - -func (f *HumanFormatter) SetIgnoreFields(ignore []string) { - if ignore == nil { - f.ignoreFields = nil - return - } - f.ignoreFields = make(map[string]bool, len(ignore)) - - for _, field := range ignore { - f.ignoreFields[field] = true - } -} - -func (f *HumanFormatter) ignored(field string) bool { - return f.ignoreFields != nil && f.ignoreFields[field] -} - -func (f *HumanFormatter) Format(e *logger.Entry) (out []byte, err error) { - - var line bytes.Buffer - - if f.metadataFlags&MetadataTime != 0 { - fmt.Fprintf(&line, "%s ", e.Time.Format(HumanFormatterDateFormat)) - } - if f.metadataFlags&MetadataLevel != 0 { - fmt.Fprintf(&line, "[%s]", e.Level.Short()) - } - - prefixFields := []string{logJobField, logTaskField, logSubsysField} - prefixed := make(map[string]bool, len(prefixFields)+2) - for _, field := range prefixFields { - val, ok := e.Fields[field].(string) - if !ok { - continue - } - if !f.ignored(field) { - fmt.Fprintf(&line, "[%s]", val) - prefixed[field] = true - } - } - - if line.Len() > 0 { - fmt.Fprint(&line, ": ") - } - fmt.Fprint(&line, e.Message) - - if len(e.Fields)-len(prefixed) > 0 { - fmt.Fprint(&line, " ") - enc := logfmt.NewEncoder(&line) - for field, value := range e.Fields { - if prefixed[field] || f.ignored(field) { - continue - } - if err := logfmtTryEncodeKeyval(enc, field, value); err != nil { - return nil, err - } - } - } - - return line.Bytes(), nil -} - -type JSONFormatter struct { - metadataFlags MetadataFlags -} - -func (f *JSONFormatter) SetMetadataFlags(flags MetadataFlags) { - f.metadataFlags = flags -} - -func (f *JSONFormatter) Format(e *logger.Entry) ([]byte, error) { - data := make(logger.Fields, len(e.Fields)+3) - for k, v := range e.Fields { - switch v := v.(type) { - case error: - // Otherwise errors are ignored by `encoding/json` - // https://github.com/sirupsen/logrus/issues/137 - data[k] = v.Error() - default: - _, err := json.Marshal(v) - if err != nil { - return nil, errors.Errorf("field is not JSON encodable: %s", k) - } - data[k] = v - } - } - - data[FieldMessage] = e.Message - data[FieldTime] = e.Time.Format(time.RFC3339) - data[FieldLevel] = e.Level - - return json.Marshal(data) - -} - -type LogfmtFormatter struct { - metadataFlags MetadataFlags -} - -func (f *LogfmtFormatter) SetMetadataFlags(flags MetadataFlags) { - f.metadataFlags = flags -} - -func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) { - var buf bytes.Buffer - enc := logfmt.NewEncoder(&buf) - - if f.metadataFlags&MetadataTime != 0 { - enc.EncodeKeyval(FieldTime, e.Time) - } - if f.metadataFlags&MetadataLevel != 0 { - enc.EncodeKeyval(FieldLevel, e.Level) - } - - // at least try and put job and task in front - prefixed := make(map[string]bool, 2) - prefix := []string{logJobField, logTaskField, logSubsysField} - for _, pf := range prefix { - v, ok := e.Fields[pf] - if !ok { - break - } - if err := logfmtTryEncodeKeyval(enc, pf, v); err != nil { - return nil, err // unlikely - } - prefixed[pf] = true - } - - enc.EncodeKeyval(FieldMessage, e.Message) - - for k, v := range e.Fields { - if !prefixed[k] { - if err := logfmtTryEncodeKeyval(enc, k, v); err != nil { - return nil, err - } - } - } - - return buf.Bytes(), nil -} - -func logfmtTryEncodeKeyval(enc *logfmt.Encoder, field, value interface{}) error { - - err := enc.EncodeKeyval(field, value) - switch err { - case nil: // ok - return nil - case logfmt.ErrUnsupportedValueType: - enc.EncodeKeyval(field, fmt.Sprintf("<%T>", value)) - return nil - } - return errors.Wrapf(err, "cannot encode field '%s'", field) - -} diff --git a/cmd/logging_outlets.go b/cmd/logging_outlets.go deleted file mode 100644 index d201ddc..0000000 --- a/cmd/logging_outlets.go +++ /dev/null @@ -1,161 +0,0 @@ -package cmd - -import ( - "bytes" - "context" - "crypto/tls" - "github.com/pkg/errors" - "github.com/zrepl/zrepl/logger" - "io" - "log/syslog" - "net" - "time" -) - -type WriterOutlet struct { - Formatter EntryFormatter - Writer io.Writer -} - -func (h WriterOutlet) WriteEntry(entry logger.Entry) error { - bytes, err := h.Formatter.Format(&entry) - if err != nil { - return err - } - _, err = h.Writer.Write(bytes) - h.Writer.Write([]byte("\n")) - return err -} - -type TCPOutlet struct { - formatter EntryFormatter - // Specifies how much time must pass between a connection error and a reconnection attempt - // Log entries written to the outlet during this time interval are silently dropped. - connect func(ctx context.Context) (net.Conn, error) - entryChan chan *bytes.Buffer -} - -func NewTCPOutlet(formatter EntryFormatter, network, address string, tlsConfig *tls.Config, retryInterval time.Duration) *TCPOutlet { - - connect := func(ctx context.Context) (conn net.Conn, err error) { - deadl, ok := ctx.Deadline() - if !ok { - deadl = time.Time{} - } - dialer := net.Dialer{ - Deadline: deadl, - } - if tlsConfig != nil { - conn, err = tls.DialWithDialer(&dialer, network, address, tlsConfig) - } else { - conn, err = dialer.DialContext(ctx, network, address) - } - return - } - - entryChan := make(chan *bytes.Buffer, 1) // allow one message in flight while previos is in io.Copy() - - o := &TCPOutlet{ - formatter: formatter, - connect: connect, - entryChan: entryChan, - } - - go o.outLoop(retryInterval) - - return o -} - -// FIXME: use this method -func (h *TCPOutlet) Close() { - close(h.entryChan) -} - -func (h *TCPOutlet) outLoop(retryInterval time.Duration) { - - var retry time.Time - var conn net.Conn - for msg := range h.entryChan { - var err error - for conn == nil { - time.Sleep(time.Until(retry)) - ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(retryInterval)) - conn, err = h.connect(ctx) - cancel() - if err != nil { - retry = time.Now().Add(retryInterval) - conn = nil - } - } - conn.SetWriteDeadline(time.Now().Add(retryInterval)) - _, err = io.Copy(conn, msg) - if err != nil { - retry = time.Now().Add(retryInterval) - conn.Close() - conn = nil - } - } -} - -func (h *TCPOutlet) WriteEntry(e logger.Entry) error { - - ebytes, err := h.formatter.Format(&e) - if err != nil { - return err - } - - buf := new(bytes.Buffer) - buf.Write(ebytes) - buf.WriteString("\n") - - select { - case h.entryChan <- buf: - return nil - default: - return errors.New("connection broken or not fast enough") - } -} - -type SyslogOutlet struct { - Formatter EntryFormatter - RetryInterval time.Duration - writer *syslog.Writer - lastConnectAttempt time.Time -} - -func (o *SyslogOutlet) WriteEntry(entry logger.Entry) error { - - bytes, err := o.Formatter.Format(&entry) - if err != nil { - return err - } - - s := string(bytes) - - if o.writer == nil { - now := time.Now() - if now.Sub(o.lastConnectAttempt) < o.RetryInterval { - return nil // not an error toward logger - } - o.writer, err = syslog.New(syslog.LOG_LOCAL0, "zrepl") - o.lastConnectAttempt = time.Now() - if err != nil { - o.writer = nil - return err - } - } - - switch entry.Level { - case logger.Debug: - return o.writer.Debug(s) - case logger.Info: - return o.writer.Info(s) - case logger.Warn: - return o.writer.Warning(s) - case logger.Error: - return o.writer.Err(s) - default: - return o.writer.Err(s) // write as error as reaching this case is in fact an error - } - -} diff --git a/cmd/main.go b/cmd/main.go deleted file mode 100644 index d459cd5..0000000 --- a/cmd/main.go +++ /dev/null @@ -1,43 +0,0 @@ -// zrepl replicates ZFS filesystems & volumes between pools -// -// Code Organization -// -// The cmd package uses github.com/spf13/cobra for its CLI. -// -// It combines the other packages in the zrepl project to implement zrepl functionality. -// -// Each subcommand's code is in the corresponding *.go file. -// All other *.go files contain code shared by the subcommands. -package cmd - -import ( - "github.com/spf13/cobra" - "github.com/zrepl/zrepl/logger" -) - -// -//type Logger interface { -// Printf(format string, v ...interface{}) -//} - -type Logger = logger.Logger - -var RootCmd = &cobra.Command{ - Use: "zrepl", - Short: "ZFS dataset replication", - Long: `Replicate ZFS filesystems & volumes between pools: - - - push & pull mode - - automatic snapshot creation & pruning - - local / over the network - - ACLs instead of blank SSH access`, -} - -var rootArgs struct { - configFile string -} - -func init() { - //cobra.OnInitialize(initConfig) - RootCmd.PersistentFlags().StringVar(&rootArgs.configFile, "config", "", "config file path") -} diff --git a/cmd/prune.go b/cmd/prune.go deleted file mode 100644 index 6963451..0000000 --- a/cmd/prune.go +++ /dev/null @@ -1,123 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "github.com/zrepl/zrepl/zfs" - "time" -) - -type Pruner struct { - Now time.Time - DryRun bool - DatasetFilter zfs.DatasetFilter - policies []PrunePolicy -} - -type PruneResult struct { - Filesystem *zfs.DatasetPath - All []zfs.FilesystemVersion - Keep []zfs.FilesystemVersion - Remove []zfs.FilesystemVersion -} - -func (p *Pruner) filterFilesystems(ctx context.Context) (filesystems []*zfs.DatasetPath, stop bool) { - filesystems, err := zfs.ZFSListMapping(p.DatasetFilter) - if err != nil { - getLogger(ctx).WithError(err).Error("error applying filesystem filter") - return nil, true - } - if len(filesystems) <= 0 { - getLogger(ctx).Info("no filesystems matching filter") - return nil, true - } - return filesystems, false -} - -func (p *Pruner) filterVersions(ctx context.Context, fs *zfs.DatasetPath) (fsversions []zfs.FilesystemVersion, stop bool) { - log := getLogger(ctx).WithField("fs", fs.ToString()) - - filter := AnyFSVFilter{} - fsversions, err := zfs.ZFSListFilesystemVersions(fs, filter) - if err != nil { - log.WithError(err).Error("error listing filesytem versions") - return nil, true - } - if len(fsversions) == 0 { - log.Info("no filesystem versions matching prefix") - return nil, true - } - return fsversions, false -} - -func (p *Pruner) pruneFilesystem(ctx context.Context, fs *zfs.DatasetPath) (r PruneResult, valid bool) { - log := getLogger(ctx).WithField("fs", fs.ToString()) - - fsversions, stop := p.filterVersions(ctx, fs) - if stop { - return - } - - keep, remove, err := p.PrunePolicy.Prune(fs, fsversions) - if err != nil { - log.WithError(err).Error("error evaluating prune policy") - return - } - - log.WithField("fsversions", fsversions). - WithField("keep", keep). - WithField("remove", remove). - Debug("prune policy debug dump") - - r = PruneResult{fs, fsversions, keep, remove} - - makeFields := func(v zfs.FilesystemVersion) (fields map[string]interface{}) { - fields = make(map[string]interface{}) - fields["version"] = v.ToAbsPath(fs) - timeSince := v.Creation.Sub(p.Now) - fields["age_ns"] = timeSince - const day time.Duration = 24 * time.Hour - days := timeSince / day - remainder := timeSince % day - fields["age_str"] = fmt.Sprintf("%dd%s", days, remainder) - return - } - - for _, v := range remove { - fields := makeFields(v) - log.WithFields(fields).Info("destroying version") - // echo what we'll do and exec zfs destroy if not dry run - // TODO special handling for EBUSY (zfs hold) - // TODO error handling for clones? just echo to cli, skip over, and exit with non-zero status code (we're idempotent) - if !p.DryRun { - err := zfs.ZFSDestroyFilesystemVersion(fs, v) - if err != nil { - log.WithFields(fields).WithError(err).Error("error destroying version") - } - } - } - return r, true -} - -func (p *Pruner) Run(ctx context.Context) (r []PruneResult, err error) { - if p.DryRun { - getLogger(ctx).Info("doing dry run") - } - - filesystems, stop := p.filterFilesystems(ctx) - if stop { - return - } - - r = make([]PruneResult, 0, len(filesystems)) - - for _, fs := range filesystems { - res, ok := p.pruneFilesystem(ctx, fs) - if ok { - r = append(r, res) - } - } - - return - -} diff --git a/cmd/sampleconf/localbackup/host1.yml b/cmd/sampleconf/localbackup/host1.yml deleted file mode 100644 index 5ef4f65..0000000 --- a/cmd/sampleconf/localbackup/host1.yml +++ /dev/null @@ -1,28 +0,0 @@ -jobs: -- name: mirror_local - type: local - - # snapshot the filesystems matched by the left-hand-side of the mapping - # every 10m with zrepl_ as prefix - mapping: { - "zroot/var/db<": "storage/backups/local/zroot/var/db", - "zroot/usr/home<": "storage/backups/local/zroot/usr/home", - "zroot/usr/home/paranoid": "!", #don't backup paranoid user - "zroot/poudriere/ports<": "!", #don't backup the ports trees - } - snapshot_prefix: zrepl_ - interval: 10m - - # keep one hour of 10m interval snapshots of filesystems matched by - # the left-hand-side of the mapping - prune_lhs: - policy: grid - grid: 1x1h(keep=all) - keep_bookmarks: all - - # follow a grandfathering scheme for filesystems on the right-hand-side of the mapping - prune_rhs: - policy: grid - grid: 1x1h(keep=all) | 24x1h | 35x1d | 6x30d - - diff --git a/cmd/sampleconf/pullbackup/backuphost.yml b/cmd/sampleconf/pullbackup/backuphost.yml deleted file mode 100644 index 206e947..0000000 --- a/cmd/sampleconf/pullbackup/backuphost.yml +++ /dev/null @@ -1,26 +0,0 @@ -jobs: -- name: fullbackup_prod1 - type: pull - # connect to remote using ssh / stdinserver command - connect: - type: ssh+stdinserver - host: prod1.example.com - user: root - port: 22 - identity_file: /root/.ssh/id_ed25519 - - # pull (=ask for new snapshots) every 10m, prune afterwards - # this will leave us at most 10m behind production - interval: 10m - - # pull all offered filesystems to storage/backups/zrepl/pull/prod1.example.com - mapping: { - "<":"storage/backups/zrepl/pull/prod1.example.com" - } - - # follow a grandfathering scheme for filesystems on the right-hand-side of the mapping - snapshot_prefix: zrepl_ - prune: - policy: grid - grid: 1x1h(keep=all) | 24x1h | 35x1d | 6x30d - diff --git a/cmd/sampleconf/pullbackup/productionhost.yml b/cmd/sampleconf/pullbackup/productionhost.yml deleted file mode 100644 index d2914e5..0000000 --- a/cmd/sampleconf/pullbackup/productionhost.yml +++ /dev/null @@ -1,47 +0,0 @@ -global: - serve: - stdinserver: - # Directory where AF_UNIX sockets for stdinserver command are placed. - # - # `zrepl stdinserver CLIENT_IDENTITY` - # * connects to the socket in $sockdir/CLIENT_IDENTITY - # * sends its stdin / stdout file descriptors to the `zrepl daemon` process (see cmsg(3)) - # * does nothing more - # - # This enables a setup where `zrepl daemon` is not directly exposed to the internet - # but instead all traffic is tunnelled through SSH. - # The server with the source job has an authorized_keys file entry for the public key - # used by the corresponding pull job - # - # command="/mnt/zrepl stdinserver CLIENT_IDENTITY" ssh-ed25519 AAAAC3NzaC1E... zrepl@pullingserver - # - # Below is the default value. - sockdir: /var/run/zrepl/stdinserver - -jobs: - -- name: fullbackup_prod1 - # expect remote to connect via ssh+stdinserver with fullbackup_prod1 as client_identity - type: source - serve: - type: stdinserver # see global.serve.stdinserver for explanation - client_identity: fullbackup_prod1 - - # snapshot these filesystems every 10m with zrepl_ as prefix - filesystems: { - "zroot/var/db<": "ok", - "zroot/usr/home<": "ok", - "zroot/var/tmp": "!", #don't backup /tmp - } - snapshot_prefix: zrepl_ - interval: 10m - - - # keep 1 hour of snapshots (6 at 10m interval) - # and one day of bookmarks in case pull doesn't work (link down, etc) - # => keep_bookmarks = 24h / interval = 24h / 10m = 144 - prune: - policy: grid - grid: 1x1h(keep=all) - keep_bookmarks: 144 - diff --git a/cmd/sampleconf/pushbackup/backuphost.yml b/cmd/sampleconf/pushbackup/backuphost.yml deleted file mode 100644 index 25431a6..0000000 --- a/cmd/sampleconf/pushbackup/backuphost.yml +++ /dev/null @@ -1,20 +0,0 @@ -jobs: -- name: fullbackup_prod1 - - # expect remote to connect via ssh+stdinserver with fullbackup_prod1 as client_identity - type: push-sink - serve: - type: stdinserver - client_identity: fullbackup_prod1 - - # map all pushed datasets to storage/backups/zrepl/sink/prod1.example.com - mapping: { - "<":"storage/backups/zrepl/sink/prod1.example.com" - } - - # follow a grandfathering scheme for filesystems on the right-hand-side of the mapping - prune: - policy: grid - grid: 1x1h(keep=all) | 24x1h | 35x1d | 6x30d - - diff --git a/cmd/sampleconf/pushbackup/productionhost.yml b/cmd/sampleconf/pushbackup/productionhost.yml deleted file mode 100644 index 5b81ecd..0000000 --- a/cmd/sampleconf/pushbackup/productionhost.yml +++ /dev/null @@ -1,26 +0,0 @@ -jobs: -- name: fullbackup_prod1 - - # connect to remote using ssh / stdinserver command - type: push - connect: - type: ssh+stdinserver - host: prod1.example.com - user: root - port: 22 - identity_file: /root/.ssh/id_ed25519 - - # snapshot these datsets every 10m with zrepl_ as prefix - filesystems: { - "zroot/var/db<": "ok", - "zroot/usr/home<": "!", - } - snapshot_prefix: zrepl_ - interval: 10m - - # keep a one day window 10m interval snapshots in case push doesn't work (link down, etc) - # (we cannot keep more than one day because this host will run out of disk space) - prune: - policy: grid - grid: 1x1d(keep=all) - diff --git a/cmd/sampleconf/random/debugging.yml b/cmd/sampleconf/random/debugging.yml deleted file mode 100644 index 237f3fe..0000000 --- a/cmd/sampleconf/random/debugging.yml +++ /dev/null @@ -1,33 +0,0 @@ -global: - serve: - stdinserver: - sockdir: /var/run/zrepl/stdinserver - -jobs: - -- name: debian2_pull - # JOB DEBUGGING OPTIONS - # should be equal for all job types, but each job implements the debugging itself - # => consult job documentation for supported options - debug: - conn: # debug the io.ReadWriteCloser connection - read_dump: /tmp/connlog_read # dump results of Read() invocations to this file - write_dump: /tmp/connlog_write # dump results of Write() invocations to this file - rpc: # debug the RPC protocol implementation - log: true # log output from rpc layer to the job log - - # ... just to make the unit tests pass. - # check other examples, e.g. localbackup or pullbackup for what the sutff below means - type: source - serve: - type: stdinserver - client_identity: debian2 - filesystems: { - "pool1/db<": ok - } - snapshot_prefix: zrepl_ - interval: 1s - prune: - policy: grid - grid: 1x10s(keep=all) - keep_bookmarks: all diff --git a/cmd/sampleconf/random/logging/client.crt b/cmd/sampleconf/random/logging/client.crt deleted file mode 100644 index 58dbfa5..0000000 --- a/cmd/sampleconf/random/logging/client.crt +++ /dev/null @@ -1,19 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDIzCCAgsCAQEwDQYJKoZIhvcNAQELBQAwWTELMAkGA1UEBhMCQVUxEzARBgNV -BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0 -ZDESMBAGA1UEAwwJbG9nc2VydmVyMB4XDTE3MDkyNDEyMzAzNloXDTE3MTAyNDEy -MzAzNlowVjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNV -BAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGY2xpZW50MIIB -IjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAt/xJTUlqApeJGzRD+w2J8sZS -Bo+s+04T987L/M6gaCo8aDSTEb/ZH3XSoU5JEmO6kPpwNNapOsaEhTCjndZQdm5F -uqiUtAg1uW0HCkBEIDkGr9bFHDKzpewGmmMgfQ2+hfiBR/4ZCrc/vd9P0W9BiWQS -Dtc7p22XraWPVL8HlSz5K/Ih+V6i8O+kBltZkusiJh2bWPoRp/netiTZuc6du+Wp -kpWp1OBaTU4GXIAlLj5afF14BBphRQK983Yhaz53BkA7OQ76XxowynMjmuLQVGmK -f1R9zEJuohTX9XIr1tp/ueRHcS4Awk6LcNZUMCV6270FNSIw2f4hbOZvep+t2wID -AQABMA0GCSqGSIb3DQEBCwUAA4IBAQACK3OeNzScpiNwz/jpg/usQzvXbZ/wDvml -YLjtzn/A65ox8a8BhxvH1ydyoCM2YAGYX7+y7qXJnMgRO/v8565CQIVcznHhg9ST -3828/WqZ3bXf2DV5GxKKQf7hPmBnyVUUhn/Ny91MECED27lZucWiX/bczN8ffDeh -M3+ngezcJxsOBd4x0gLrqIJCoaFRSeepOaFEW6GHQ8loxE9GmA7FQd2phIpJHFSd -Z7nQl7X5C1iN2OboEApJHwtmNVC45UlOpg53vo2sDTLhSfdogstiWi8x1HmvhIGM -j3XHs0Illvo9OwVrmgUph8zQ7pvr/AFrTOIbhgzl/9uVUk5ApwFM ------END CERTIFICATE----- diff --git a/cmd/sampleconf/random/logging/client.csr b/cmd/sampleconf/random/logging/client.csr deleted file mode 100644 index 380bb7f..0000000 --- a/cmd/sampleconf/random/logging/client.csr +++ /dev/null @@ -1,16 +0,0 @@ ------BEGIN CERTIFICATE REQUEST----- -MIICmzCCAYMCAQAwVjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx -ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGY2xp -ZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAt/xJTUlqApeJGzRD -+w2J8sZSBo+s+04T987L/M6gaCo8aDSTEb/ZH3XSoU5JEmO6kPpwNNapOsaEhTCj -ndZQdm5FuqiUtAg1uW0HCkBEIDkGr9bFHDKzpewGmmMgfQ2+hfiBR/4ZCrc/vd9P -0W9BiWQSDtc7p22XraWPVL8HlSz5K/Ih+V6i8O+kBltZkusiJh2bWPoRp/netiTZ -uc6du+WpkpWp1OBaTU4GXIAlLj5afF14BBphRQK983Yhaz53BkA7OQ76XxowynMj -muLQVGmKf1R9zEJuohTX9XIr1tp/ueRHcS4Awk6LcNZUMCV6270FNSIw2f4hbOZv -ep+t2wIDAQABoAAwDQYJKoZIhvcNAQELBQADggEBAKnlr0Qs5KYF85u2YA7DJ5pL -HwAx+qNoNbox5CS1aynrDBpDTWLaErviUJ+4WxRlRyTMEscMOIOKajbYhqqFmtGZ -mu3SshZnFihErw8TOQMyU1LGGG+l6r+6ve5TciwJRLla2Y75z7izr6cyvQNRWdLr -PvxL1/Yqr8LKha12+7o28R4SLf6/GY0GcedqoebRmtuwA/jES0PuGauEUD5lH4cj -Me8sqRrB+IMHQ5j8hlJX4DbA8UQRUBL64sHkQzeQfWu+qkWmS5I19CFfLNrcH+OV -yhyjGfN0q0jHyHdpckBhgzS7IIdo6P66AIlm4qpHM7Scra3JaGM7oaZPamJ6f8U= ------END CERTIFICATE REQUEST----- diff --git a/cmd/sampleconf/random/logging/client.key b/cmd/sampleconf/random/logging/client.key deleted file mode 100644 index a7f1565..0000000 --- a/cmd/sampleconf/random/logging/client.key +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC3/ElNSWoCl4kb -NEP7DYnyxlIGj6z7ThP3zsv8zqBoKjxoNJMRv9kfddKhTkkSY7qQ+nA01qk6xoSF -MKOd1lB2bkW6qJS0CDW5bQcKQEQgOQav1sUcMrOl7AaaYyB9Db6F+IFH/hkKtz+9 -30/Rb0GJZBIO1zunbZetpY9UvweVLPkr8iH5XqLw76QGW1mS6yImHZtY+hGn+d62 -JNm5zp275amSlanU4FpNTgZcgCUuPlp8XXgEGmFFAr3zdiFrPncGQDs5DvpfGjDK -cyOa4tBUaYp/VH3MQm6iFNf1civW2n+55EdxLgDCTotw1lQwJXrbvQU1IjDZ/iFs -5m96n63bAgMBAAECggEAF4om0sWe06ARwbJJNFjCGpa3LfG5/xk5Qs5pmPnS2iD1 -Q5veaTnzjKvlfA/pF3o9B4mTS59fXY7Cq8vSU0J1XwGy2DPzeqlGPmgtq2kXjkvd -iCfhZj8ybvsoyR3/rSBSDRADcnOXPqC9fgyRSMmESBDOoql1D3HdIzF4ii46ySIU -/XQvExS6NWifbP+Ue6DETV8NhreO5PqjeXLITQhhndtc8MDL/8eCNOyN8XjYIWKX -smlBYtRQYOOY9BHOQgUn6yvPHrtKJNKci+qcQNvWir66mBhY1o40MH5wTIV+8yP2 -Vbm/VzoNKIYgeROsilBW7QTwGvkDn3R11zeTqfUNSQKBgQD0eFzhJAEZi4uBw6Tg -NKmBC5Y1IHPOsb5gKPNz9Z9j4qYRDySgYl6ISk+2EdhgUCo1NmTk8EIPQjIerUVf -S+EogFnpsj8U9LR3OM79DaGkNULxrHqhd209/g8DtVgk7yjkxL4vmVOv8qpHMp/7 -eWsylN7AOxj2RB/eXYQBPrw+jQKBgQDAqae9HasLmvpJ9ktTv30yZSKXC+LP4A0D -RBBmx410VpPd4CvcpCJxXmjer6B7+9L1xHYP2pvsnMBid5i0knuvyK28dYy7fldl -CzWvb+lqNA5YYPFXQED4oEdihlQczoI1Bm06SFizeAKD1Q9e2c+lgbR/51j8xuXi -twvhMj/YBwKBgQCZw97/iQrcC2Zq7yiUEOuQjD4lGk1c83U/vGIsTJC9XcCAOFsc -OeMlrD/oz96d7a4unBDn4qpaOJOXsfpRT0PGmrxy/jcpMiUUW/ntNpa11v5NTeQw -DRL8DAFbnsNbL8Yz5f+Nps35fBNYBuKTZLJlNTfKByHTO9QjpAQ0WEZEvQKBgQCi -Ovm83EuYVSKmvxcE6Tyx/8lVqTOO2Vn7wweQlD4/lVujvE0S2L8L+XSS9w5K+GzW -eFz10p3zarbw80YJ30L5bSEmjVE43BUZR4woMzM4M6dUsiTm1HshIE2b4ALZ0uZ/ -Ye794ceXL9nmSrVLqFsaQZLNFPCwwYb4FiyRry9lZwKBgAO9VbWcN8SEeBDKo3z8 -yRbRTc6sI+AdKY44Dfx0tqOPmTjO3mE4X1GU4sbfD2Bvg3DdjwTuxxC/jHaKu0GG -dTM0CbrZGbDAj7E87SOcN/PWEeBckSvuQq5H3DQfwIpTmlS1l5oZn9CxRGbLqC2G -ifnel8XWUG0ROybsr1tk4mzW ------END PRIVATE KEY----- diff --git a/cmd/sampleconf/random/logging/logserver.crt b/cmd/sampleconf/random/logging/logserver.crt deleted file mode 100644 index 87ec4bb..0000000 --- a/cmd/sampleconf/random/logging/logserver.crt +++ /dev/null @@ -1,21 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDiDCCAnCgAwIBAgIJALhp/WvTQeg/MA0GCSqGSIb3DQEBCwUAMFkxCzAJBgNV -BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX -aWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvZ3NlcnZlcjAeFw0xNzA5MjQxMjI3 -MDRaFw0yNzA5MjIxMjI3MDRaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21l -LVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNV -BAMMCWxvZ3NlcnZlcjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKs3 -TLYfXhV3hap71tOkhPQlM+m0EKRAo8Nua50Cci5UhDo4JkVpyYok1h+NFkqmjU2b -IiIuGvsZZPOWYjbWWnSJE4+n5pBFBzcfNQ4d8xVxjANImFn6Tcehhj0WkbDIv/Ge -364XUgywS7u3EGQj/FO7vZ8KHlUxBHNuPIOPHftwIVRyleh5K32UyBaSpSmnqGos -rvI1byMuznavcZpOs4vlebZ+Jy6a20iKf9fj/0f0t0O+F5x3JIk07D3zSywhJ4RM -M0mGIUmYXbh2SMh+f61KDZLDANpz/pMAPbUJe0mxEtBf0tnwK1gEqc3SLwA0EwiM -8Hnn2iaH5Ln20UE3LOkCAwEAAaNTMFEwHQYDVR0OBBYEFDXoDcwx9SngzZcRYCeP -BplBecfiMB8GA1UdIwQYMBaAFDXoDcwx9SngzZcRYCePBplBecfiMA8GA1UdEwEB -/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBADyNvs4AA91x3gurQb1pcPVhK6nR -mkYSTN1AsDKSRi/X2iCUmR7G7FlF7XW8mntTpHvVzcs+gr94WckH5wqEOA5iZnaw -PXUWexmdXUge4hmC2q6kBQ5e2ykhSJMRVZXvOLZOZV9qitceamHESV1cKZSNMvZM -aCSVA1RK61/nUzs04pVp5PFPv9gFxJp9ki39FYFdsgZmM5RZ5I/FqxxvTJzu4RnH -VPjsMopzARYwJw6dV2bKdFSYOE8B/Vs3Yv0GxjrABw2ko4PkBPTjLIz22x6+Hd9r -K9BQi4pVmQfvppF5+SORSftlHSS+N47b0DD1rW1f5R6QGi71dFuJGikOwvY= ------END CERTIFICATE----- diff --git a/cmd/sampleconf/random/logging/logserver.key b/cmd/sampleconf/random/logging/logserver.key deleted file mode 100644 index 8eb8faa..0000000 --- a/cmd/sampleconf/random/logging/logserver.key +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCrN0y2H14Vd4Wq -e9bTpIT0JTPptBCkQKPDbmudAnIuVIQ6OCZFacmKJNYfjRZKpo1NmyIiLhr7GWTz -lmI21lp0iROPp+aQRQc3HzUOHfMVcYwDSJhZ+k3HoYY9FpGwyL/xnt+uF1IMsEu7 -txBkI/xTu72fCh5VMQRzbjyDjx37cCFUcpXoeSt9lMgWkqUpp6hqLK7yNW8jLs52 -r3GaTrOL5Xm2ficumttIin/X4/9H9LdDvhecdySJNOw980ssISeETDNJhiFJmF24 -dkjIfn+tSg2SwwDac/6TAD21CXtJsRLQX9LZ8CtYBKnN0i8ANBMIjPB559omh+S5 -9tFBNyzpAgMBAAECggEBAIY8ZwJq+WKvQLb3POjWFf8so9TY/ispGrwAeJKy9j5o -uPrERw0o8YBDfTVjclS43BQ6Srqtly3DLSjlgL8ps+WmCxYYN2ZpGE0ZRIl65bis -O2/fnML+wbiAZTTD2xnVatfPDeP6GLQmDFpyHoHEzPIBQZvNXRbBxZGSnhMvQ/x7 -FhqSBQG4kf3b1XDCENIbFEVOBOCg7WtMiIgjEGS7QnW3I65/Zt+Ts1LXRZbz+6na -Gmi0PGHA/oLUh1NRzsF4zuZn6fFzja5zw4mkt+JvCWEoxg1QhRAxRp6QQwmZ6MIc -1rw1D4Z+c5UEKyqHeIwZj4M6UNPhCfTXVm47c9eSiGECgYEA4U8pB+7eRo2fqX0C -nWsWMcmsULJvwplQnUSFenUayPn3E8ammS/ZBHksoKhj82vwIdDbtS1hQZn8Bzsi -atc8au0wz0YRDcVDzHX4HknXVQayHtP/FTPeSr5hwpoY8vhEbySuxBTBkXCrp4dx -u5ErfOiYEP3Q1ZvPRywelrATu20CgYEAwonV5dgOcen/4oAirlnvufc2NfqhAQwJ -FJ/JSVMAcXxPYu3sZMv0dGWrX8mLc+P1+XMCuV/7eBM/vU2LbDzmpeUV8sJfB2jw -wyKqKXZwBgeq60btriA4f+0ElwRGgU2KSiniUuuTX2JmyftFQx4cVAQRCFk27NY0 -09psSsYyre0CgYBo6unabdtH029EB5iOIW3GZXk+Yrk0TxyA/4WAjsOYTv5FUT4H -G4bdVGf5sDBLDDpYJOAKsEUXvVLlMx5FzlCuIiGWg7QxS2jU7yJJSG1jhKixPlsM -Toj3GUyAyC1SB1Ymw1g2qsuwpFzquGG3zFQJ6G3Xi7oRnmqZY+wik3+8yQKBgB11 -SdKYOPe++2SNCrNkIw0CBk9+OEs0S1u4Jn7X9sU4kbzlUlqhF89YZe8HUfqmlmTD -qbHwet/f6lL8HxSw1Cxi2EP+cu1oUqz53tKQgL4pAxTFlNA9SND2Ty+fEh4aY8p/ -NSphSduzxuTnC8HyGVAPnZSqDcsnVLCP7r4T7TCxAoGAbJygkkk/gZ9pT4fZoIaq -8CMR8FTfxtkwCuZsWccSMUOWtx9nqet3gbCpKHfyoYZiKB4ke+lnUz4uFS16Y3hG -kN0hFfvfoNa8eB2Ox7vs60cMMfWJac0H7KSaDDy+EvbhE2KtQADT0eWxMyhzGR8p -5CbIivB0QCjeQIA8dOQpE8E= ------END PRIVATE KEY----- diff --git a/cmd/sampleconf/random/logging_and_monitoring.yml b/cmd/sampleconf/random/logging_and_monitoring.yml deleted file mode 100644 index d739f99..0000000 --- a/cmd/sampleconf/random/logging_and_monitoring.yml +++ /dev/null @@ -1,28 +0,0 @@ -global: - logging: - - - outlet: stdout - level: warn - format: human - - - outlet: tcp - level: debug - format: json - net: tcp - address: 127.0.0.1:8080 - retry_interval: 1s - tls: # if not specified, use plain TCP - ca: sampleconf/random/logging/logserver.crt - cert: sampleconf/random/logging/client.crt - key: sampleconf/random/logging/client.key - - - outlet: syslog - level: debug - format: logfmt - - monitoring: - - - type: prometheus - listen: ':9090' - -jobs: [] diff --git a/cmd/stdinserver.go b/cmd/stdinserver.go deleted file mode 100644 index 6d5d41e..0000000 --- a/cmd/stdinserver.go +++ /dev/null @@ -1,55 +0,0 @@ -package cmd - -import ( - "os" - - "context" - "github.com/problame/go-netssh" - "github.com/spf13/cobra" - "log" - "path" -) - -var StdinserverCmd = &cobra.Command{ - Use: "stdinserver CLIENT_IDENTITY", - Short: "start in stdinserver mode (from authorized_keys file)", - Run: cmdStdinServer, -} - -func init() { - RootCmd.AddCommand(StdinserverCmd) -} - -func cmdStdinServer(cmd *cobra.Command, args []string) { - - // NOTE: the netssh proxying protocol requires exiting with non-zero status if anything goes wrong - defer os.Exit(1) - - log := log.New(os.Stderr, "", log.LUTC|log.Ldate|log.Ltime) - - conf, err := ParseConfig(rootArgs.configFile) - if err != nil { - log.Printf("error parsing config: %s", err) - return - } - - if len(args) != 1 || args[0] == "" { - log.Print("must specify client_identity as positional argument") - return - } - - identity := args[0] - unixaddr := path.Join(conf.Global.Serve.Stdinserver.SockDir, identity) - - log.Printf("proxying client identity '%s' to zrepl daemon '%s'", identity, unixaddr) - - ctx := netssh.ContextWithLog(context.TODO(), log) - - err = netssh.Proxy(ctx, unixaddr) - if err == nil { - log.Print("proxying finished successfully, exiting with status 0") - os.Exit(0) - } - log.Printf("error proxying: %s", err) - -} diff --git a/cmd/test.go b/cmd/test.go deleted file mode 100644 index 78f4e71..0000000 --- a/cmd/test.go +++ /dev/null @@ -1,214 +0,0 @@ -package cmd - -import ( - "os" - - "bytes" - "context" - "fmt" - "sort" - "strings" - - "github.com/kr/pretty" - "github.com/spf13/cobra" - "github.com/zrepl/zrepl/logger" - "github.com/zrepl/zrepl/zfs" - "time" -) - -var testCmd = &cobra.Command{ - Use: "test", - Short: "test configuration", - PersistentPreRun: testCmdGlobalInit, -} - -var testCmdGlobal struct { - log Logger - conf *Config -} - -var testConfigSyntaxCmd = &cobra.Command{ - Use: "config", - Short: "parse config file and dump parsed datastructure", - Run: doTestConfig, -} - -var testDatasetMapFilter = &cobra.Command{ - Use: "pattern jobname test/zfs/dataset/path", - Short: "test dataset mapping / filter specified in config", - Example: ` zrepl test pattern my_pull_job tank/tmp`, - Run: doTestDatasetMapFilter, -} - -var testPrunePolicyArgs struct { - side PrunePolicySide - showKept bool - showRemoved bool -} - -var testPrunePolicyCmd = &cobra.Command{ - Use: "prune jobname", - Short: "do a dry-run of the pruning part of a job", - Run: doTestPrunePolicy, -} - -func init() { - RootCmd.AddCommand(testCmd) - testCmd.AddCommand(testConfigSyntaxCmd) - testCmd.AddCommand(testDatasetMapFilter) - - testPrunePolicyCmd.Flags().VarP(&testPrunePolicyArgs.side, "side", "s", "prune_lhs (left) or prune_rhs (right)") - testPrunePolicyCmd.Flags().BoolVar(&testPrunePolicyArgs.showKept, "kept", false, "show kept snapshots") - testPrunePolicyCmd.Flags().BoolVar(&testPrunePolicyArgs.showRemoved, "removed", true, "show removed snapshots") - testCmd.AddCommand(testPrunePolicyCmd) -} - -func testCmdGlobalInit(cmd *cobra.Command, args []string) { - - out := logger.NewOutlets() - out.Add(WriterOutlet{&NoFormatter{}, os.Stdout}, logger.Info) - log := logger.NewLogger(out, 1*time.Second) - testCmdGlobal.log = log - - var err error - if testCmdGlobal.conf, err = ParseConfig(rootArgs.configFile); err != nil { - testCmdGlobal.log.Printf("error parsing config file: %s", err) - os.Exit(1) - } - -} - -func doTestConfig(cmd *cobra.Command, args []string) { - - log, conf := testCmdGlobal.log, testCmdGlobal.conf - - log.Printf("config ok") - log.Printf("%# v", pretty.Formatter(conf)) - return -} - -func doTestDatasetMapFilter(cmd *cobra.Command, args []string) { - - log, conf := testCmdGlobal.log, testCmdGlobal.conf - - if len(args) != 2 { - log.Printf("specify job name as first postitional argument, test input as second") - log.Printf(cmd.UsageString()) - os.Exit(1) - } - n, i := args[0], args[1] - - jobi, err := conf.LookupJob(n) - if err != nil { - log.Printf("%s", err) - os.Exit(1) - } - - var mf *DatasetMapFilter - switch j := jobi.(type) { - case *PullJob: - mf = j.Mapping - case *SourceJob: - mf = j.Filesystems - case *LocalJob: - mf = j.Mapping - default: - panic("incomplete implementation") - } - - ip, err := zfs.NewDatasetPath(i) - if err != nil { - log.Printf("cannot parse test input as ZFS dataset path: %s", err) - os.Exit(1) - } - - if mf.filterMode { - pass, err := mf.Filter(ip) - if err != nil { - log.Printf("error evaluating filter: %s", err) - os.Exit(1) - } - log.Printf("filter result: %v", pass) - } else { - res, err := mf.Map(ip) - if err != nil { - log.Printf("error evaluating mapping: %s", err) - os.Exit(1) - } - toStr := "NO MAPPING" - if res != nil { - toStr = res.ToString() - } - log.Printf("%s => %s", ip.ToString(), toStr) - - } - -} - -func doTestPrunePolicy(cmd *cobra.Command, args []string) { - - log, conf := testCmdGlobal.log, testCmdGlobal.conf - - if cmd.Flags().NArg() != 1 { - log.Printf("specify job name as first positional argument") - log.Printf(cmd.UsageString()) - os.Exit(1) - } - - jobname := cmd.Flags().Arg(0) - jobi, err := conf.LookupJob(jobname) - if err != nil { - log.Printf("%s", err) - os.Exit(1) - } - - jobp, ok := jobi.(PruningJob) - if !ok { - log.Printf("job doesn't do any prunes") - os.Exit(0) - } - - log.Printf("job dump:\n%s", pretty.Sprint(jobp)) - - pruner, err := jobp.Pruner(testPrunePolicyArgs.side, true) - if err != nil { - log.Printf("cannot create test pruner: %s", err) - os.Exit(1) - } - - log.Printf("start pruning") - - ctx := WithLogger(context.Background(), log) - result, err := pruner.Run(ctx) - if err != nil { - log.Printf("error running pruner: %s", err) - os.Exit(1) - } - - sort.Slice(result, func(i, j int) bool { - return strings.Compare(result[i].Filesystem.ToString(), result[j].Filesystem.ToString()) == -1 - }) - - var b bytes.Buffer - for _, r := range result { - fmt.Fprintf(&b, "%s\n", r.Filesystem.ToString()) - - if testPrunePolicyArgs.showKept { - fmt.Fprintf(&b, "\tkept:\n") - for _, v := range r.Keep { - fmt.Fprintf(&b, "\t- %s\n", v.Name) - } - } - - if testPrunePolicyArgs.showRemoved { - fmt.Fprintf(&b, "\tremoved:\n") - for _, v := range r.Remove { - fmt.Fprintf(&b, "\t- %s\n", v.Name) - } - } - - } - - log.Printf("pruning result:\n%s", b.String()) - -} diff --git a/cmd/version.go b/cmd/version.go deleted file mode 100644 index ab838b2..0000000 --- a/cmd/version.go +++ /dev/null @@ -1,21 +0,0 @@ -package cmd - -import ( - "fmt" - "github.com/spf13/cobra" - "github.com/zrepl/zrepl/version" -) - -var versionCmd = &cobra.Command{ - Use: "version", - Short: "print version of zrepl binary (for running daemon 'zrepl control version' command)", - Run: doVersion, -} - -func init() { - RootCmd.AddCommand(versionCmd) -} - -func doVersion(cmd *cobra.Command, args []string) { - fmt.Println(version.NewZreplVersionInformation().String()) -}