config: unify job debugging options

This commit is contained in:
Christian Schwarz 2017-09-11 15:45:10 +02:00
parent 93a58a36bf
commit 1deaa459c8
9 changed files with 112 additions and 45 deletions

View File

@ -3,7 +3,6 @@ package cmd
import (
"io"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/zfs"
)
@ -20,8 +19,18 @@ type Global struct {
}
}
type RPCConnecter interface {
Connect() (rpc.RPCClient, error)
type JobDebugSettings struct {
Conn struct {
ReadDump string `mapstructure:"read_dump"`
WriteDump string `mapstructure:"write_dump"`
}
RPC struct {
Log bool
}
}
type RWCConnecter interface {
Connect() (io.ReadWriteCloser, error)
}
type AuthenticatedChannelListenerFactory interface {
Listen() (AuthenticatedChannelListener, error)

View File

@ -7,9 +7,7 @@ import (
"github.com/jinzhu/copier"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/sshbytestream"
"github.com/zrepl/zrepl/util"
)
type SSHStdinserverConnecter struct {
@ -20,8 +18,6 @@ type SSHStdinserverConnecter struct {
TransportOpenCommand []string `mapstructure:"transport_open_command"`
SSHCommand string `mapstructure:"ssh_command"`
Options []string
ConnLogReadFile string `mapstructure:"connlog_read_file"`
ConnLogWriteFile string `mapstructure:"connlog_write_file"`
}
func parseSSHStdinserverConnecter(i map[string]interface{}) (c *SSHStdinserverConnecter, err error) {
@ -37,20 +33,14 @@ func parseSSHStdinserverConnecter(i map[string]interface{}) (c *SSHStdinserverCo
}
func (c *SSHStdinserverConnecter) Connect() (client rpc.RPCClient, err error) {
var stream io.ReadWriteCloser
func (c *SSHStdinserverConnecter) Connect() (rwc io.ReadWriteCloser, err error) {
var rpcTransport sshbytestream.SSHTransport
if err = copier.Copy(&rpcTransport, c); err != nil {
return
}
if stream, err = sshbytestream.Outgoing(rpcTransport); err != nil {
if rwc, err = sshbytestream.Outgoing(rpcTransport); err != nil {
err = errors.WithStack(err)
return
}
stream, err = util.NewReadWriteCloserLogger(stream, c.ConnLogReadFile, c.ConnLogWriteFile)
if err != nil {
return
}
client = rpc.NewClient(stream)
return client, nil
return
}

View File

@ -3,30 +3,32 @@ package cmd
import (
"time"
"github.com/zrepl/zrepl/rpc"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc"
)
type LocalJob struct {
Name string
Mapping *DatasetMapFilter
SnapshotFilter *PrefixSnapshotFilter
SnapshotFilter *PrefixSnapshotFilter
Interval time.Duration
InitialReplPolicy InitialReplPolicy
PruneLHS PrunePolicy
PruneRHS PrunePolicy
Debug JobDebugSettings
}
func parseLocalJob(name string, i map[string]interface{}) (j *LocalJob, err error) {
var asMap struct {
Mapping map[string]string
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
Interval string
InitialReplPolicy string `mapstructure:"initial_repl_policy"`
PruneLHS map[string]interface{} `mapstructure:"prune_lhs"`
PruneRHS map[string]interface{} `mapstructure:"prune_rhs"`
Mapping map[string]string
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
Interval string
InitialReplPolicy string `mapstructure:"initial_repl_policy"`
PruneLHS map[string]interface{} `mapstructure:"prune_lhs"`
PruneRHS map[string]interface{} `mapstructure:"prune_rhs"`
Debug map[string]interface{}
}
if err = mapstructure.Decode(i, &asMap); err != nil {
@ -62,6 +64,11 @@ func parseLocalJob(name string, i map[string]interface{}) (j *LocalJob, err erro
return
}
if err = mapstructure.Decode(asMap.Debug, &j.Debug); err != nil {
err = errors.Wrap(err, "cannot parse 'debug'")
return
}
return
}

View File

@ -5,15 +5,18 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/util"
)
type PullJob struct {
Name string
Connect RPCConnecter
Connect RWCConnecter
Mapping *DatasetMapFilter
SnapshotFilter *PrefixSnapshotFilter
InitialReplPolicy InitialReplPolicy
Prune PrunePolicy
Debug JobDebugSettings
}
func parsePullJob(name string, i map[string]interface{}) (j *PullJob, err error) {
@ -24,6 +27,7 @@ func parsePullJob(name string, i map[string]interface{}) (j *PullJob, err error)
InitialReplPolicy string `mapstructure:"initial_repl_policy"`
Prune map[string]interface{}
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
Debug map[string]interface{}
}
if err = mapstructure.Decode(i, &asMap); err != nil {
@ -60,6 +64,11 @@ func parsePullJob(name string, i map[string]interface{}) (j *PullJob, err error)
return
}
if err = mapstructure.Decode(asMap.Debug, &j.Debug); err != nil {
err = errors.Wrap(err, "cannot parse 'debug'")
return
}
return
}
@ -68,11 +77,23 @@ func (j *PullJob) JobName() string {
}
func (j *PullJob) JobDo(log Logger) (err error) {
client, err := j.Connect.Connect()
rwc, err := j.Connect.Connect()
if err != nil {
log.Printf("error connect: %s", err)
return err
}
rwc, err = util.NewReadWriteCloserLogger(rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump)
if err != nil {
return
}
client := rpc.NewClient(rwc)
if j.Debug.RPC.Log {
client.SetLogger(log, true)
}
defer closeRPCWithTimeout(log, client, time.Second*10, "")
return doPull(PullContext{client, log, j.Mapping, j.InitialReplPolicy})
}

View File

@ -1,15 +1,15 @@
package cmd
import (
"time"
mapstructure "github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc"
"github.com/zrepl/zrepl/util"
"io"
"os"
"os/signal"
"syscall"
"time"
)
type SourceJob struct {
@ -19,6 +19,7 @@ type SourceJob struct {
SnapshotFilter *PrefixSnapshotFilter
Interval time.Duration
Prune PrunePolicy
Debug JobDebugSettings
}
func parseSourceJob(name string, i map[string]interface{}) (j *SourceJob, err error) {
@ -29,6 +30,7 @@ func parseSourceJob(name string, i map[string]interface{}) (j *SourceJob, err er
SnapshotPrefix string `mapstructure:"snapshot_prefix"`
Interval string
Prune map[string]interface{}
Debug map[string]interface{}
}
if err = mapstructure.Decode(i, &asMap); err != nil {
@ -59,6 +61,11 @@ func parseSourceJob(name string, i map[string]interface{}) (j *SourceJob, err er
return
}
if err = mapstructure.Decode(asMap.Debug, &j.Debug); err != nil {
err = errors.Wrap(err, "cannot parse 'debug'")
return
}
return
}
@ -100,6 +107,11 @@ outer:
break outer // closed because of accept error
}
rwc, err := util.NewReadWriteCloserLogger(rwc, j.Debug.Conn.ReadDump, j.Debug.Conn.WriteDump)
if err != nil {
panic(err)
}
// construct connection handler
handler := Handler{
Logger: log,
@ -108,6 +120,10 @@ outer:
// handle connection
rpcServer := rpc.NewServer(rwc)
if j.Debug.RPC.Log {
rpclog := util.NewPrefixLogger(log, "rpc")
rpcServer.SetLogger(rpclog, true)
}
registerEndpoints(rpcServer, handler)
if err = rpcServer.Serve(); err != nil {
log.Printf("error serving connection: %s", err)

View File

@ -115,7 +115,7 @@ func parseJob(i map[string]interface{}) (j Job, err error) {
}
func parseConnect(i map[string]interface{}) (c RPCConnecter, err error) {
func parseConnect(i map[string]interface{}) (c RWCConnecter, err error) {
t, err := extractStringField(i, "type", true)
if err != nil {

View File

@ -4,7 +4,6 @@ import (
"github.com/ftrvxmtrx/fd"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/util"
"io"
"net"
"os"
@ -13,9 +12,7 @@ import (
)
type StdinserverListenerFactory struct {
ClientIdentity string `mapstructure:"client_identity"`
ConnLogReadFile string `mapstructure:"connlog_read_file"`
ConnLogWriteFile string `mapstructure:"connlog_write_file"`
ClientIdentity string `mapstructure:"client_identity"`
}
func parseStdinserverListenerFactory(i map[string]interface{}) (f *StdinserverListenerFactory, err error) {
@ -63,15 +60,13 @@ func (f *StdinserverListenerFactory) Listen() (al AuthenticatedChannelListener,
return nil, errors.Wrapf(err, "cannot listen on unix socket %s", unixaddr)
}
l := &StdinserverListener{ul, f.ConnLogReadFile, f.ConnLogWriteFile}
l := &StdinserverListener{ul}
return l, nil
}
type StdinserverListener struct {
l *net.UnixListener
ConnLogReadFile string `mapstructure:"connlog_read_file"`
ConnLogWriteFile string `mapstructure:"connlog_write_file"`
l *net.UnixListener
}
type fdRWC struct {
@ -109,12 +104,7 @@ func (l *StdinserverListener) Accept() (ch io.ReadWriteCloser, err error) {
rwc := fdRWC{files[0], files[1], c.(*net.UnixConn)}
rwclog, err := util.NewReadWriteCloserLogger(rwc, l.ConnLogReadFile, l.ConnLogWriteFile)
if err != nil {
panic(err)
}
return rwclog, nil
return rwc, nil
}

View File

@ -11,10 +11,11 @@ import (
func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) {
paths:= []string{
paths := []string{
"./sampleconf/localbackup/host1.yml",
"./sampleconf/pullbackup/backuphost.yml",
"./sampleconf/pullbackup/productionhost.yml",
"./sampleconf/random/debugging.yml",
}
for _, p := range paths {

View File

@ -0,0 +1,33 @@
global:
serve:
stdinserver:
sockdir: /var/run/zrepl/stdinserver
jobs:
- name: debian2_pull
# JOB DEBUGGING OPTIONS
# should be equal for all job types, but each job implements the debugging itself
# => consult job documentation for supported options
debug:
conn: # debug the io.ReadWriteCloser connection
read_dump: /tmp/connlog_read # dump results of Read() invocations to this file
write_dump: /tmp/connlog_write # dump results of Write() invocations to this file
rpc: # debug the RPC protocol implementation
log: true # log output from rpc layer to the job log
# ... just to make the unit tests pass.
# check other examples, e.g. localbackup or pullbackup for what the sutff below means
type: source
serve:
type: stdinserver
client_identity: debian2
datasets: {
"pool1/db<": ok
}
snapshot_prefix: zrepl_
interval: 1s
prune:
policy: grid
grid: 1x10s(keep=all)