diff --git a/client/status.go b/client/status.go index 5182772..7847c84 100644 --- a/client/status.go +++ b/client/status.go @@ -62,7 +62,7 @@ func (t *tui) addIndent(indent int) { t.moveLine(0, 0) } -func RunStatus(config config.Config, args []string) error { +func RunStatus(config *config.Config, args []string) error { httpc, err := controlHttpClient(config.Global.Control.SockPath) if err != nil { return err diff --git a/client/wakeup.go b/client/wakeup.go index 97fb180..d6a224f 100644 --- a/client/wakeup.go +++ b/client/wakeup.go @@ -6,7 +6,7 @@ import ( "github.com/zrepl/zrepl/daemon" ) -func RunWakeup(config config.Config, args []string) error { +func RunWakeup(config *config.Config, args []string) error { if len(args) != 1 { return errors.Errorf("Expected 1 argument: job") } diff --git a/config/config.go b/config/config.go index 93a7a29..d6a334f 100644 --- a/config/config.go +++ b/config/config.go @@ -9,11 +9,12 @@ import ( "regexp" "strconv" "time" + "reflect" ) type Config struct { Jobs []JobEnum `yaml:"jobs"` - Global Global `yaml:"global"` + Global *Global `yaml:"global,optional,fromdefaults"` } type JobEnum struct { @@ -102,11 +103,51 @@ type PruningLocal struct { Keep []PruningEnum `yaml:"keep"` } +type LoggingOutletEnumList []LoggingOutletEnum + +func (l *LoggingOutletEnumList) SetDefault() { + def := ` +type: "stdout" +time: true +level: "warn" +format: "human" +` + s := StdoutLoggingOutlet{} + err := yaml.UnmarshalStrict([]byte(def), &s) + if err != nil { + panic(err) + } + *l = []LoggingOutletEnum{LoggingOutletEnum{Ret: s}} +} + +var _ yaml.Defaulter = &LoggingOutletEnumList{} + type Global struct { - Logging []LoggingOutletEnum `yaml:"logging"` + Logging *LoggingOutletEnumList `yaml:"logging,optional,fromdefaults"` Monitoring []MonitoringEnum `yaml:"monitoring,optional"` - Control GlobalControl `yaml:"control"` - Serve GlobalServe `yaml:"serve"` + Control *GlobalControl `yaml:"control,optional,fromdefaults"` + Serve *GlobalServe `yaml:"serve,optional,fromdefaults"` + RPC *RPCConfig `yaml:"rpc,optional,fromdefaults"` +} + +func Default(i interface{}) { + v := reflect.ValueOf(i) + if v.Kind() != reflect.Ptr { + panic(v) + } + y := `{}` + err := yaml.Unmarshal([]byte(y), v.Interface()) + if err != nil { + panic(err) + } +} + +type RPCConfig struct { + Timeout time.Duration `yaml:"timeout,optional,positive,default=10s"` + TxChunkSize uint `yaml:"tx_chunk_size,optional,default=32768"` + RxStructuredMaxLen uint `yaml:"rx_structured_max,optional,default=16777216"` + RxStreamChunkMaxLen uint `yaml:"rx_stream_chunk_max,optional,default=16777216"` + RxHeaderMaxLen uint `yaml:"rx_header_max,optional,default=40960"` } type ConnectEnum struct { @@ -115,12 +156,14 @@ type ConnectEnum struct { type TCPConnect struct { Type string `yaml:"type"` + RPC *RPCConfig `yaml:"rpc,optional"` Address string `yaml:"address"` DialTimeout time.Duration `yaml:"dial_timeout,positive,default=10s"` } type TLSConnect struct { Type string `yaml:"type"` + RPC *RPCConfig `yaml:"rpc,optional"` Address string `yaml:"address"` Ca string `yaml:"ca"` Cert string `yaml:"cert"` @@ -131,6 +174,7 @@ type TLSConnect struct { type SSHStdinserverConnect struct { Type string `yaml:"type"` + RPC *RPCConfig `yaml:"rpc,optional"` Host string `yaml:"host"` User string `yaml:"user"` Port uint16 `yaml:"port"` @@ -233,7 +277,7 @@ type GlobalControl struct { } type GlobalServe struct { - StdinServer GlobalStdinServer `yaml:"stdinserver"` + StdinServer *GlobalStdinServer `yaml:"stdinserver,optional,fromdefaults"` } type GlobalStdinServer struct { @@ -241,10 +285,11 @@ type GlobalStdinServer struct { } type JobDebugSettings struct { - Conn struct { + Conn *struct { ReadDump string `yaml:"read_dump"` WriteDump string `yaml:"write_dump"` - } `yaml:"conn"` + } `yaml:"conn,optional"` + RPCLog bool `yaml:"rpc_log,optional,default=false"` } func enumUnmarshal(u func(interface{}, bool) error, types map[string]interface{}) (interface{}, error) { @@ -328,7 +373,7 @@ var ConfigFileDefaultLocations = []string{ "/usr/local/etc/zrepl/zrepl.yml", } -func ParseConfig(path string) (i Config, err error) { +func ParseConfig(path string) (i *Config, err error) { if path == "" { // Try default locations @@ -352,11 +397,18 @@ func ParseConfig(path string) (i Config, err error) { return } - if err = yaml.UnmarshalStrict(bytes, &i); err != nil { - return - } + return ParseConfigBytes(bytes) +} - return +func ParseConfigBytes(bytes []byte) (*Config, error) { + var c *Config + if err := yaml.UnmarshalStrict(bytes, &c); err != nil { + return nil, err + } + if c == nil { + return nil, fmt.Errorf("config is empty or only consists of comments") + } + return c, nil } var durationStringRegex *regexp.Regexp = regexp.MustCompile(`^\s*(\d+)\s*(s|m|h|d|w)\s*$`) diff --git a/config/config_minimal_test.go b/config/config_minimal_test.go new file mode 100644 index 0000000..82edde8 --- /dev/null +++ b/config/config_minimal_test.go @@ -0,0 +1,41 @@ +package config + +import ( + "testing" + "github.com/stretchr/testify/assert" +) + +func TestConfigEmptyFails(t *testing.T) { + conf, err := testConfig(t, "\n") + assert.Nil(t, conf) + assert.Error(t, err) +} + +func TestJobsOnlyWorks(t *testing.T) { + testValidConfig(t, ` +jobs: +- name: push + type: push + # snapshot the filesystems matched by the left-hand-side of the mapping + # every 10m with zrepl_ as prefix + replication: + connect: + type: tcp + address: localhost:2342 + filesystems: { + "pool1/var/db<": true, + "pool1/usr/home<": true, + "pool1/usr/home/paranoid": false, #don't backup paranoid user + "pool1/poudriere/ports<": false #don't backup the ports trees + } + snapshotting: + snapshot_prefix: zrepl_ + interval: 10m + pruning: + keep_sender: + - type: not_replicated + keep_receiver: + - type: last_n + count: 1 +`) +} \ No newline at end of file diff --git a/config/config_rpc_test.go b/config/config_rpc_test.go new file mode 100644 index 0000000..0fd2a4c --- /dev/null +++ b/config/config_rpc_test.go @@ -0,0 +1,61 @@ +package config + +import ( + "testing" + "github.com/stretchr/testify/assert" + "time" +) + +func TestRPC (t *testing.T) { + conf := testValidConfig(t, ` +jobs: +- name: pull_servers + type: pull + replication: + connect: + type: tcp + address: "server1.foo.bar:8888" + rpc: + timeout: 20s # different form default, should merge + root_dataset: "pool2/backup_servers" + interval: 10m + pruning: + keep_sender: + - type: not_replicated + keep_receiver: + - type: last_n + count: 100 + +- name: pull_servers2 + type: pull + replication: + connect: + type: tcp + address: "server1.foo.bar:8888" + rpc: + tx_chunk_size: 0xabcd # different from default, should merge + root_dataset: "pool2/backup_servers" + interval: 10m + pruning: + keep_sender: + - type: not_replicated + keep_receiver: + - type: last_n + count: 100 +`) + + assert.Equal(t, 20*time.Second, conf.Jobs[0].Ret.(*PullJob).Replication.Connect.Ret.(*TCPConnect).RPC.Timeout) + assert.Equal(t, uint(0xabcd), conf.Jobs[1].Ret.(*PullJob).Replication.Connect.Ret.(*TCPConnect).RPC.TxChunkSize) + defConf := RPCConfig{} + Default(&defConf) + assert.Equal(t, defConf.Timeout, conf.Global.RPC.Timeout) +} + +func TestGlobal_DefaultRPCConfig(t *testing.T) { + assert.NotPanics(t, func() { + var c RPCConfig + Default(&c) + assert.NotNil(t, c) + assert.Equal(t, c.TxChunkSize, uint(1)<<15) + }) +} diff --git a/config/config_test.go b/config/config_test.go index ad975d0..64a12f9 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -4,6 +4,9 @@ import ( "github.com/kr/pretty" "path/filepath" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/zrepl/yaml-config" ) func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) { @@ -27,3 +30,26 @@ func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) { } } + +func TestLoggingOutletEnumList_SetDefaults(t *testing.T) { + e := &LoggingOutletEnumList{} + var i yaml.Defaulter = e + require.NotPanics(t, func() { + i.SetDefault() + assert.Equal(t, "warn", (*e)[0].Ret.(StdoutLoggingOutlet).Level) + }) +} + + +func testValidConfig(t *testing.T, input string) (*Config) { + t.Helper() + conf, err := testConfig(t, input) + require.NoError(t, err) + require.NotNil(t, conf) + return conf +} + +func testConfig(t *testing.T, input string) (*Config, error) { + t.Helper() + return ParseConfigBytes([]byte(input)) +} \ No newline at end of file diff --git a/daemon/connecter/connecter.go b/daemon/connecter/connecter.go index a27ba7d..a5d95ad 100644 --- a/daemon/connecter/connecter.go +++ b/daemon/connecter/connecter.go @@ -6,7 +6,7 @@ import ( "github.com/zrepl/zrepl/config" ) -func FromConfig(g config.Global, in config.ConnectEnum) (streamrpc.Connecter, error) { +func FromConfig(g *config.Global, in config.ConnectEnum) (streamrpc.Connecter, error) { switch v := in.Ret.(type) { case *config.SSHStdinserverConnect: return SSHStdinserverConnecterFromConfig(v) diff --git a/daemon/daemon.go b/daemon/daemon.go index e231d72..078f1f4 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -17,7 +17,7 @@ import ( "time" ) -func Run(conf config.Config) error { +func Run(conf *config.Config) error { ctx, cancel := context.WithCancel(context.Background()) @@ -29,7 +29,7 @@ func Run(conf config.Config) error { cancel() }() - outlets, err := logging.OutletsFromConfig(conf.Global.Logging) + outlets, err := logging.OutletsFromConfig(*conf.Global.Logging) if err != nil { return errors.Wrap(err, "cannot build logging from config") } diff --git a/daemon/job/build_jobs.go b/daemon/job/build_jobs.go index 8843b55..b438cac 100644 --- a/daemon/job/build_jobs.go +++ b/daemon/job/build_jobs.go @@ -6,7 +6,7 @@ import ( "github.com/pkg/errors" ) -func JobsFromConfig(c config.Config) ([]Job, error) { +func JobsFromConfig(c *config.Config) ([]Job, error) { js := make([]Job, len(c.Jobs)) for i := range c.Jobs { j, err := buildJob(c.Global, c.Jobs[i]) @@ -18,7 +18,7 @@ func JobsFromConfig(c config.Config) ([]Job, error) { return js, nil } -func buildJob(c config.Global, in config.JobEnum) (j Job, err error) { +func buildJob(c *config.Global, in config.JobEnum) (j Job, err error) { switch v := in.Ret.(type) { case *config.SinkJob: j, err = SinkFromConfig(c, v) diff --git a/daemon/job/push.go b/daemon/job/push.go index e4750c2..088e863 100644 --- a/daemon/job/push.go +++ b/daemon/job/push.go @@ -25,7 +25,7 @@ type Push struct { replication *replication.Replication } -func PushFromConfig(g config.Global, in *config.PushJob) (j *Push, err error) { +func PushFromConfig(g *config.Global, in *config.PushJob) (j *Push, err error) { j = &Push{} j.name = in.Name diff --git a/daemon/job/sink.go b/daemon/job/sink.go index 8c9f496..3c20ae3 100644 --- a/daemon/job/sink.go +++ b/daemon/job/sink.go @@ -19,7 +19,7 @@ type Sink struct { fsmapInv endpoint.FSFilter } -func SinkFromConfig(g config.Global, in *config.SinkJob) (s *Sink, err error) { +func SinkFromConfig(g *config.Global, in *config.SinkJob) (s *Sink, err error) { // FIXME multi client support diff --git a/daemon/logging/build_logging.go b/daemon/logging/build_logging.go index 9d88d41..fbaf9c2 100644 --- a/daemon/logging/build_logging.go +++ b/daemon/logging/build_logging.go @@ -16,7 +16,7 @@ import ( "os" ) -func OutletsFromConfig(in []config.LoggingOutletEnum) (*logger.Outlets, error) { +func OutletsFromConfig(in config.LoggingOutletEnumList) (*logger.Outlets, error) { outlets := logger.NewOutlets() diff --git a/daemon/serve/serve.go b/daemon/serve/serve.go index 8ab7945..4df8488 100644 --- a/daemon/serve/serve.go +++ b/daemon/serve/serve.go @@ -10,7 +10,7 @@ type ListenerFactory interface { Listen() (net.Listener, error) } -func FromConfig(g config.Global, in config.ServeEnum) (ListenerFactory, error) { +func FromConfig(g *config.Global, in config.ServeEnum) (ListenerFactory, error) { switch v := in.Ret.(type) { case *config.TCPServe: diff --git a/daemon/serve/serve_stdinserver.go b/daemon/serve/serve_stdinserver.go index 4048d65..f6403d3 100644 --- a/daemon/serve/serve_stdinserver.go +++ b/daemon/serve/serve_stdinserver.go @@ -15,7 +15,7 @@ type StdinserverListenerFactory struct { sockpath string } -func StdinserverListenerFactoryFromConfig(g config.Global, in *config.StdinserverServer) (f *StdinserverListenerFactory, err error) { +func StdinserverListenerFactoryFromConfig(g *config.Global, in *config.StdinserverServer) (f *StdinserverListenerFactory, err error) { f = &StdinserverListenerFactory{ ClientIdentity: in.ClientIdentity, diff --git a/daemon/serve/serve_tcp.go b/daemon/serve/serve_tcp.go index a5bad28..21cab59 100644 --- a/daemon/serve/serve_tcp.go +++ b/daemon/serve/serve_tcp.go @@ -9,7 +9,7 @@ type TCPListenerFactory struct { Address string } -func TCPListenerFactoryFromConfig(c config.Global, in *config.TCPServe) (*TCPListenerFactory, error) { +func TCPListenerFactoryFromConfig(c *config.Global, in *config.TCPServe) (*TCPListenerFactory, error) { lf := &TCPListenerFactory{ Address: in.Listen, } diff --git a/daemon/serve/serve_tls.go b/daemon/serve/serve_tls.go index f24f5ad..0b80345 100644 --- a/daemon/serve/serve_tls.go +++ b/daemon/serve/serve_tls.go @@ -18,7 +18,7 @@ type TLSListenerFactory struct { handshakeTimeout time.Duration } -func TLSListenerFactoryFromConfig(c config.Global, in *config.TLSServe) (lf *TLSListenerFactory, err error) { +func TLSListenerFactoryFromConfig(c *config.Global, in *config.TLSServe) (lf *TLSListenerFactory, err error) { lf = &TLSListenerFactory{ address: in.Listen, }