mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-03 04:48:55 +01:00
build logger from new config
This commit is contained in:
parent
38bb78b642
commit
13dc63bd23
@ -1,30 +1,33 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"github.com/zrepl/yaml-config"
|
||||
"fmt"
|
||||
"time"
|
||||
"os"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/yaml-config"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
type NodeEnum struct {
|
||||
type Config struct {
|
||||
Jobs []JobEnum `yaml:"jobs"`
|
||||
Global Global `yaml:"global"`
|
||||
}
|
||||
|
||||
type JobEnum struct {
|
||||
Ret interface{}
|
||||
}
|
||||
|
||||
type PushNode struct {
|
||||
type PushJob struct {
|
||||
Type string `yaml:"type"`
|
||||
Replication PushReplication `yaml:"replication"`
|
||||
Snapshotting Snapshotting `yaml:"snapshotting"`
|
||||
Pruning Pruning `yaml:"pruning"`
|
||||
Global Global `yaml:"global"`
|
||||
}
|
||||
|
||||
type SinkNode struct {
|
||||
type SinkJob struct {
|
||||
Type string `yaml:"type"`
|
||||
Replication SinkReplication `yaml:"replication"`
|
||||
Global Global `yaml:"global"`
|
||||
}
|
||||
|
||||
type PushReplication struct {
|
||||
@ -48,7 +51,7 @@ type Pruning struct {
|
||||
}
|
||||
|
||||
type Global struct {
|
||||
Logging []LoggingOutlet `yaml:"logging"`
|
||||
Logging []LoggingOutletEnum `yaml:"logging"`
|
||||
}
|
||||
|
||||
type ConnectEnum struct {
|
||||
@ -104,26 +107,40 @@ type PruneGrid struct {
|
||||
Grid string `yaml:"grid"`
|
||||
}
|
||||
|
||||
type LoggingOutlet struct {
|
||||
Outlet LoggingOutletEnum `yaml:"outlet"`
|
||||
Level string `yaml:"level"`
|
||||
Format string `yaml:"format"`
|
||||
}
|
||||
|
||||
type LoggingOutletEnum struct {
|
||||
Ret interface{}
|
||||
}
|
||||
|
||||
type StdoutLoggingOutlet struct {
|
||||
type LoggingOutletCommon struct {
|
||||
Type string `yaml:"type"`
|
||||
Level string `yaml:"level"`
|
||||
Format string `yaml:"format"`
|
||||
}
|
||||
|
||||
type StdoutLoggingOutlet struct {
|
||||
LoggingOutletCommon `yaml:",inline"`
|
||||
Time bool `yaml:"time"`
|
||||
}
|
||||
|
||||
type SyslogLoggingOutlet struct {
|
||||
Type string `yaml:"type"`
|
||||
LoggingOutletCommon `yaml:",inline"`
|
||||
RetryInterval time.Duration `yaml:"retry_interval"`
|
||||
}
|
||||
|
||||
type TCPLoggingOutlet struct {
|
||||
LoggingOutletCommon `yaml:",inline"`
|
||||
Address string `yaml:"address"` //TODO required
|
||||
Net string `yaml:"net"` //TODO default tcp
|
||||
RetryInterval time.Duration `yaml:"retry_interval"`
|
||||
TLS *TCPLoggingOutletTLS
|
||||
}
|
||||
|
||||
type TCPLoggingOutletTLS struct {
|
||||
CA string `yaml:"ca"`
|
||||
Cert string `yaml:"cert"`
|
||||
Key string `yaml:"key"`
|
||||
}
|
||||
|
||||
func enumUnmarshal(u func(interface{}, bool) error, types map[string]interface{}) (interface{}, error) {
|
||||
var in struct {
|
||||
Type string
|
||||
@ -145,10 +162,10 @@ func enumUnmarshal(u func(interface{}, bool) error, types map[string]interface{}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func (t *NodeEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) {
|
||||
func (t *JobEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) {
|
||||
t.Ret, err = enumUnmarshal(u, map[string]interface{}{
|
||||
"push": &PushNode{},
|
||||
"sink": &SinkNode{},
|
||||
"push": &PushJob{},
|
||||
"sink": &SinkJob{},
|
||||
})
|
||||
return
|
||||
}
|
||||
@ -182,6 +199,7 @@ func (t *LoggingOutletEnum) UnmarshalYAML(u func(interface{}, bool) error) (err
|
||||
t.Ret, err = enumUnmarshal(u, map[string]interface{}{
|
||||
"stdout": &StdoutLoggingOutlet{},
|
||||
"syslog": &SyslogLoggingOutlet{},
|
||||
"tcp": &TCPLoggingOutlet{},
|
||||
})
|
||||
return
|
||||
}
|
||||
@ -191,7 +209,7 @@ var ConfigFileDefaultLocations = []string{
|
||||
"/usr/local/etc/zrepl/zrepl.yml",
|
||||
}
|
||||
|
||||
func ParseConfig(path string) (i NodeEnum, err error) {
|
||||
func ParseConfig(path string) (i Config, err error) {
|
||||
|
||||
if path == "" {
|
||||
// Try default locations
|
||||
|
@ -1,9 +1,9 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"github.com/kr/pretty"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) {
|
||||
|
@ -1,4 +1,5 @@
|
||||
type: push
|
||||
jobs:
|
||||
- type: push
|
||||
replication:
|
||||
connect:
|
||||
type: tcp
|
||||
@ -23,8 +24,7 @@ pruning:
|
||||
grid: 1x1h(keep=all) | 24x1h | 35x1d | 6x30d
|
||||
global:
|
||||
logging:
|
||||
- outlet:
|
||||
type: "stdout"
|
||||
- type: "stdout"
|
||||
time: true
|
||||
level: "warn"
|
||||
format: "human"
|
||||
|
@ -1,4 +1,5 @@
|
||||
type: sink
|
||||
jobs:
|
||||
- type: sink
|
||||
replication:
|
||||
root_dataset: "pool2/backup_laptops"
|
||||
serve:
|
||||
@ -9,7 +10,11 @@ replication:
|
||||
key: "key.pem"
|
||||
global:
|
||||
logging:
|
||||
- outlet:
|
||||
type: "syslog"
|
||||
- type: "tcp"
|
||||
address: "123.123.123.123:1234"
|
||||
tls:
|
||||
ca: "ca.pem"
|
||||
cert: "cert.pem"
|
||||
key: "key.pem"
|
||||
level: "warn"
|
||||
format: "human"
|
||||
|
@ -4,12 +4,11 @@ import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"github.com/mattn/go-isatty"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/cmd/config"
|
||||
"github.com/zrepl/zrepl/cmd/tlsconf"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
type LoggingConfig struct {
|
||||
@ -26,16 +25,12 @@ const (
|
||||
MetadataAll MetadataFlags = ^0
|
||||
)
|
||||
|
||||
func parseLogging(i interface{}) (c *LoggingConfig, err error) {
|
||||
func parseLogging(in []config.LoggingOutletEnum) (c *LoggingConfig, err error) {
|
||||
|
||||
c = &LoggingConfig{}
|
||||
c.Outlets = logger.NewOutlets()
|
||||
|
||||
var asList []interface{}
|
||||
if err = mapstructure.Decode(i, &asList); err != nil {
|
||||
return nil, errors.Wrap(err, "mapstructure error")
|
||||
}
|
||||
if len(asList) == 0 {
|
||||
if len(in) == 0 {
|
||||
// Default config
|
||||
out := WriterOutlet{&HumanFormatter{}, os.Stdout}
|
||||
c.Outlets.Add(out, logger.Warn)
|
||||
@ -43,7 +38,7 @@ func parseLogging(i interface{}) (c *LoggingConfig, err error) {
|
||||
}
|
||||
|
||||
var syslogOutlets, stdoutOutlets int
|
||||
for lei, le := range asList {
|
||||
for lei, le := range in {
|
||||
|
||||
outlet, minLevel, err := parseOutlet(le)
|
||||
if err != nil {
|
||||
@ -95,56 +90,52 @@ func parseLogFormat(i interface{}) (f EntryFormatter, err error) {
|
||||
|
||||
}
|
||||
|
||||
func parseOutlet(i interface{}) (o logger.Outlet, level logger.Level, err error) {
|
||||
func parseOutlet(in config.LoggingOutletEnum) (o logger.Outlet, level logger.Level, err error) {
|
||||
|
||||
var in struct {
|
||||
Outlet string
|
||||
Level string
|
||||
Format string
|
||||
}
|
||||
if err = mapstructure.Decode(i, &in); err != nil {
|
||||
err = errors.Wrap(err, "mapstructure error")
|
||||
return
|
||||
}
|
||||
if in.Outlet == "" || in.Level == "" || in.Format == "" {
|
||||
err = errors.Errorf("must specify 'outlet', 'level' and 'format' field")
|
||||
return
|
||||
parseCommon := func(common config.LoggingOutletCommon) (logger.Level, EntryFormatter, error) {
|
||||
if common.Level == "" || common.Format == "" {
|
||||
return 0, nil, errors.Errorf("must specify 'level' and 'format' field")
|
||||
}
|
||||
|
||||
minLevel, err := logger.ParseLevel(in.Level)
|
||||
minLevel, err := logger.ParseLevel(common.Level)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "cannot parse 'level' field")
|
||||
return
|
||||
return 0, nil, errors.Wrap(err, "cannot parse 'level' field")
|
||||
}
|
||||
formatter, err := parseLogFormat(in.Format)
|
||||
formatter, err := parseLogFormat(common.Format)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "cannot parse")
|
||||
return
|
||||
return 0, nil, errors.Wrap(err, "cannot parse 'formatter' field")
|
||||
}
|
||||
return minLevel, formatter, nil
|
||||
}
|
||||
|
||||
switch in.Outlet {
|
||||
case "stdout":
|
||||
o, err = parseStdoutOutlet(i, formatter)
|
||||
case "tcp":
|
||||
o, err = parseTCPOutlet(i, formatter)
|
||||
case "syslog":
|
||||
o, err = parseSyslogOutlet(i, formatter)
|
||||
var f EntryFormatter
|
||||
|
||||
switch v := in.Ret.(type) {
|
||||
case config.StdoutLoggingOutlet:
|
||||
level, f, err = parseCommon(v.LoggingOutletCommon)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
o, err = parseStdoutOutlet(v, f)
|
||||
case config.TCPLoggingOutlet:
|
||||
level, f, err = parseCommon(v.LoggingOutletCommon)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
o, err = parseTCPOutlet(v, f)
|
||||
case config.SyslogLoggingOutlet:
|
||||
level, f, err = parseCommon(v.LoggingOutletCommon)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
o, err = parseSyslogOutlet(v, f)
|
||||
default:
|
||||
err = errors.Errorf("unknown outlet type '%s'", in.Outlet)
|
||||
panic(v)
|
||||
}
|
||||
return o, minLevel, err
|
||||
|
||||
}
|
||||
|
||||
func parseStdoutOutlet(i interface{}, formatter EntryFormatter) (WriterOutlet, error) {
|
||||
|
||||
var in struct {
|
||||
Time bool
|
||||
}
|
||||
if err := mapstructure.Decode(i, &in); err != nil {
|
||||
return WriterOutlet{}, errors.Wrap(err, "invalid structure for stdout outlet")
|
||||
return o, level, err
|
||||
}
|
||||
|
||||
func parseStdoutOutlet(in config.StdoutLoggingOutlet, formatter EntryFormatter) (WriterOutlet, error) {
|
||||
flags := MetadataAll
|
||||
writer := os.Stdout
|
||||
if !isatty.IsTerminal(writer.Fd()) && !in.Time {
|
||||
@ -158,54 +149,22 @@ func parseStdoutOutlet(i interface{}, formatter EntryFormatter) (WriterOutlet, e
|
||||
}, nil
|
||||
}
|
||||
|
||||
func parseTCPOutlet(i interface{}, formatter EntryFormatter) (out *TCPOutlet, err error) {
|
||||
|
||||
var in struct {
|
||||
Net string
|
||||
Address string
|
||||
RetryInterval string `mapstructure:"retry_interval"`
|
||||
TLS map[string]interface{}
|
||||
}
|
||||
if err = mapstructure.Decode(i, &in); err != nil {
|
||||
return nil, errors.Wrap(err, "mapstructure error")
|
||||
}
|
||||
|
||||
retryInterval, err := time.ParseDuration(in.RetryInterval)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot parse 'retry_interval'")
|
||||
}
|
||||
|
||||
if len(in.Net) == 0 {
|
||||
return nil, errors.New("field 'net' must not be empty")
|
||||
}
|
||||
if len(in.Address) == 0 {
|
||||
return nil, errors.New("field 'address' must not be empty")
|
||||
}
|
||||
|
||||
func parseTCPOutlet(in config.TCPLoggingOutlet, formatter EntryFormatter) (out *TCPOutlet, err error) {
|
||||
var tlsConfig *tls.Config
|
||||
if in.TLS != nil {
|
||||
tlsConfig, err = func(m map[string]interface{}, host string) (*tls.Config, error) {
|
||||
var in struct {
|
||||
CA string
|
||||
Cert string
|
||||
Key string
|
||||
}
|
||||
if err := mapstructure.Decode(m, &in); err != nil {
|
||||
return nil, errors.Wrap(err, "mapstructure error")
|
||||
}
|
||||
|
||||
clientCert, err := tls.LoadX509KeyPair(in.Cert, in.Key)
|
||||
tlsConfig, err = func(m *config.TCPLoggingOutletTLS, host string) (*tls.Config, error) {
|
||||
clientCert, err := tls.LoadX509KeyPair(m.Cert, m.Key)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot load client cert")
|
||||
}
|
||||
|
||||
var rootCAs *x509.CertPool
|
||||
if in.CA == "" {
|
||||
if m.CA == "" {
|
||||
if rootCAs, err = x509.SystemCertPool(); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot open system cert pool")
|
||||
}
|
||||
} else {
|
||||
rootCAs, err = tlsconf.ParseCAFile(in.CA)
|
||||
rootCAs, err = tlsconf.ParseCAFile(m.CA)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot parse CA cert")
|
||||
}
|
||||
@ -222,30 +181,14 @@ func parseTCPOutlet(i interface{}, formatter EntryFormatter) (out *TCPOutlet, er
|
||||
}
|
||||
|
||||
formatter.SetMetadataFlags(MetadataAll)
|
||||
return NewTCPOutlet(formatter, in.Net, in.Address, tlsConfig, retryInterval), nil
|
||||
return NewTCPOutlet(formatter, in.Net, in.Address, tlsConfig, in.RetryInterval), nil
|
||||
|
||||
}
|
||||
|
||||
func parseSyslogOutlet(i interface{}, formatter EntryFormatter) (out *SyslogOutlet, err error) {
|
||||
|
||||
var in struct {
|
||||
RetryInterval string `mapstructure:"retry_interval"`
|
||||
}
|
||||
if err = mapstructure.Decode(i, &in); err != nil {
|
||||
return nil, errors.Wrap(err, "mapstructure error")
|
||||
}
|
||||
|
||||
func parseSyslogOutlet(in config.SyslogLoggingOutlet, formatter EntryFormatter) (out *SyslogOutlet, err error) {
|
||||
out = &SyslogOutlet{}
|
||||
out.Formatter = formatter
|
||||
out.Formatter.SetMetadataFlags(MetadataNone)
|
||||
|
||||
out.RetryInterval = 0 // default to 0 as we assume local syslog will just work
|
||||
if in.RetryInterval != "" {
|
||||
out.RetryInterval, err = time.ParseDuration(in.RetryInterval)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot parse 'retry_interval'")
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
out.RetryInterval = in.RetryInterval
|
||||
return out, nil
|
||||
}
|
||||
|
@ -4,14 +4,14 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/zrepl/zrepl/cmd/config"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
"syscall"
|
||||
"time"
|
||||
"github.com/zrepl/zrepl/cmd/daemon"
|
||||
"github.com/zrepl/zrepl/cmd/daemon/job"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
)
|
||||
|
||||
// daemonCmd represents the daemon command
|
||||
@ -75,13 +75,19 @@ func (a daemonJobAdaptor) Status() interface{} { return nil }
|
||||
|
||||
func doDaemon(cmd *cobra.Command, args []string) {
|
||||
|
||||
conf, err := ParseConfig(rootArgs.configFile)
|
||||
conf, err := config.ParseConfig(rootArgs.configFile)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "error parsing config: %s\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
log := logger.NewLogger(conf.Global.logging.Outlets, 1*time.Second)
|
||||
outlets, err := parseLogging(conf.Global.Logging)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to generate logger: %s\n", err)
|
||||
return
|
||||
}
|
||||
log := logger.NewLogger(outlets.Outlets, 1*time.Second)
|
||||
|
||||
ctx := WithLogger(context.Background(), log)
|
||||
|
||||
daemonJobs := make([]job.Job, 0, len(conf.Jobs))
|
||||
|
Loading…
Reference in New Issue
Block a user