mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-24 09:24:09 +01:00
start implementing new daemon in package daemon
This commit is contained in:
parent
c7237cb09d
commit
89dc267780
142
daemon/control.go
Normal file
142
daemon/control.go
Normal file
@ -0,0 +1,142 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/cmd/daemon/job"
|
||||
"github.com/zrepl/zrepl/cmd/helpers"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"github.com/zrepl/zrepl/version"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type controlJob struct {
|
||||
sockaddr *net.UnixAddr
|
||||
jobs *jobs
|
||||
}
|
||||
|
||||
func newControlJob(sockpath string, jobs *jobs) (j *controlJob, err error) {
|
||||
j = &controlJob{jobs: jobs}
|
||||
|
||||
j.sockaddr, err = net.ResolveUnixAddr("unix", sockpath)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "cannot resolve unix address")
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (j *controlJob) Name() string { return jobNameControl }
|
||||
|
||||
func (j *controlJob) Status() interface{} { return nil }
|
||||
|
||||
const (
|
||||
ControlJobEndpointPProf string = "/debug/pprof"
|
||||
ControlJobEndpointVersion string = "/version"
|
||||
ControlJobEndpointStatus string = "/status"
|
||||
)
|
||||
|
||||
func (j *controlJob) Run(ctx context.Context) {
|
||||
|
||||
log := job.GetLogger(ctx)
|
||||
defer log.Info("control job finished")
|
||||
|
||||
l, err := helpers.ListenUnixPrivate(j.sockaddr)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error listening")
|
||||
return
|
||||
}
|
||||
|
||||
pprofServer := NewPProfServer(ctx)
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle(ControlJobEndpointPProf, requestLogger{log: log, handlerFunc: func(w http.ResponseWriter, r *http.Request) {
|
||||
var msg PprofServerControlMsg
|
||||
err := json.NewDecoder(r.Body).Decode(&msg)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("bad pprof request from client")
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
}
|
||||
pprofServer.Control(msg)
|
||||
w.WriteHeader(200)
|
||||
}})
|
||||
mux.Handle(ControlJobEndpointVersion,
|
||||
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
|
||||
return version.NewZreplVersionInformation(), nil
|
||||
}}})
|
||||
mux.Handle(ControlJobEndpointStatus,
|
||||
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
|
||||
s := j.jobs.status()
|
||||
return s, nil
|
||||
}}})
|
||||
server := http.Server{Handler: mux}
|
||||
|
||||
outer:
|
||||
for {
|
||||
|
||||
served := make(chan error)
|
||||
go func() {
|
||||
served <- server.Serve(l)
|
||||
close(served)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.WithError(ctx.Err()).Info("context done")
|
||||
server.Shutdown(context.Background())
|
||||
break outer
|
||||
case err = <-served:
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error serving")
|
||||
break outer
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type jsonResponder struct {
|
||||
producer func() (interface{}, error)
|
||||
}
|
||||
|
||||
func (j jsonResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
res, err := j.producer()
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
io.WriteString(w, err.Error())
|
||||
return
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
err = json.NewEncoder(&buf).Encode(res)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
io.WriteString(w, err.Error())
|
||||
} else {
|
||||
io.Copy(w, &buf)
|
||||
}
|
||||
}
|
||||
|
||||
type requestLogger struct {
|
||||
log logger.Logger
|
||||
handler http.Handler
|
||||
handlerFunc http.HandlerFunc
|
||||
}
|
||||
|
||||
func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
log := l.log.WithField("method", r.Method).WithField("url", r.URL)
|
||||
log.Info("start")
|
||||
if l.handlerFunc != nil {
|
||||
l.handlerFunc(w, r)
|
||||
} else if l.handler != nil {
|
||||
l.handler.ServeHTTP(w, r)
|
||||
} else {
|
||||
log.Error("no handler or handlerFunc configured")
|
||||
}
|
||||
log.Info("finish")
|
||||
}
|
176
daemon/daemon.go
Normal file
176
daemon/daemon.go
Normal file
@ -0,0 +1,176 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/zrepl/zrepl/daemon/job"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"github.com/zrepl/zrepl/version"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
"github.com/zrepl/zrepl/cmd/config"
|
||||
"github.com/zrepl/zrepl/daemon/logging"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
|
||||
func Run(conf config.Config) error {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
defer cancel()
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-sigChan
|
||||
cancel()
|
||||
}()
|
||||
|
||||
outlets, err := logging.OutletsFromConfig(conf.Global.Logging)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot build logging from config")
|
||||
}
|
||||
|
||||
confJobs, err := job.JobsFromConfig(conf)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot build jobs from config")
|
||||
}
|
||||
|
||||
log := logger.NewLogger(outlets, 1*time.Second)
|
||||
log.Info(version.NewZreplVersionInformation().String())
|
||||
|
||||
for _, job := range confJobs {
|
||||
if IsInternalJobName(job.Name()) {
|
||||
panic(fmt.Sprintf("internal job name used for config job '%s'", job.Name())) //FIXME
|
||||
}
|
||||
}
|
||||
|
||||
ctx = job.WithLogger(ctx, log)
|
||||
|
||||
jobs := newJobs()
|
||||
|
||||
// start control socket
|
||||
controlJob, err := newControlJob(conf.Global.Control.SockPath, jobs)
|
||||
if err != nil {
|
||||
panic(err) // FIXME
|
||||
}
|
||||
jobs.start(ctx, controlJob, true)
|
||||
|
||||
// start prometheus
|
||||
//var promJob *prometheusJob // FIXME
|
||||
//jobs.start(ctx, promJob, true)
|
||||
|
||||
log.Info("starting daemon")
|
||||
|
||||
// start regular jobs
|
||||
for _, j := range confJobs {
|
||||
jobs.start(ctx, j, false)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-jobs.wait():
|
||||
log.Info("all jobs finished")
|
||||
case <-ctx.Done():
|
||||
log.WithError(ctx.Err()).Info("context finished")
|
||||
}
|
||||
log.Info("daemon exiting")
|
||||
return nil
|
||||
}
|
||||
|
||||
type jobs struct {
|
||||
wg sync.WaitGroup
|
||||
|
||||
// m protects all fields below it
|
||||
m sync.RWMutex
|
||||
wakeups map[string]job.WakeupChan // by JobName
|
||||
jobs map[string]job.Job
|
||||
}
|
||||
|
||||
func newJobs() *jobs {
|
||||
return &jobs{
|
||||
wakeups: make(map[string]job.WakeupChan),
|
||||
jobs: make(map[string]job.Job),
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
logJobField string = "job"
|
||||
logTaskField string = "task"
|
||||
logSubsysField string = "subsystem"
|
||||
)
|
||||
|
||||
func (s *jobs) wait() <-chan struct{} {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
s.wg.Wait()
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (s *jobs) status() map[string]interface{} {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
type res struct {
|
||||
name string
|
||||
status interface{}
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
c := make(chan res, len(s.jobs))
|
||||
for name, j := range s.jobs {
|
||||
wg.Add(1)
|
||||
go func(name string, j job.Job) {
|
||||
defer wg.Done()
|
||||
c <- res{name: name, status: j.Status()}
|
||||
}(name, j)
|
||||
}
|
||||
wg.Wait()
|
||||
close(c)
|
||||
ret := make(map[string]interface{}, len(s.jobs))
|
||||
for res := range c {
|
||||
ret[res.name] = res.status
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
const (
|
||||
jobNamePrometheus = "_prometheus"
|
||||
jobNameControl = "_control"
|
||||
)
|
||||
|
||||
func IsInternalJobName(s string) bool {
|
||||
return strings.HasPrefix(s, "_")
|
||||
}
|
||||
|
||||
func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
|
||||
s.m.Lock()
|
||||
defer s.m.Unlock()
|
||||
|
||||
jobLog := job.GetLogger(ctx).WithField(logJobField, j.Name())
|
||||
jobName := j.Name()
|
||||
if !internal && IsInternalJobName(jobName) {
|
||||
panic(fmt.Sprintf("internal job name used for non-internal job %s", jobName))
|
||||
}
|
||||
if internal && !IsInternalJobName(jobName) {
|
||||
panic(fmt.Sprintf("internal job does not use internal job name %s", jobName))
|
||||
}
|
||||
if _, ok := s.jobs[jobName]; ok {
|
||||
panic(fmt.Sprintf("duplicate job name %s", jobName))
|
||||
}
|
||||
s.jobs[jobName] = j
|
||||
ctx = job.WithLogger(ctx, jobLog)
|
||||
ctx, wakeupChan := job.WithWakeup(ctx)
|
||||
s.wakeups[jobName] = wakeupChan
|
||||
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
jobLog.Info("starting job")
|
||||
defer jobLog.Info("job exited")
|
||||
j.Run(ctx)
|
||||
}()
|
||||
}
|
27
daemon/job/build_jobs.go
Normal file
27
daemon/job/build_jobs.go
Normal file
@ -0,0 +1,27 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"github.com/zrepl/zrepl/cmd/config"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func JobsFromConfig(c config.Config) ([]Job, error) {
|
||||
js := make([]Job, len(c.Jobs))
|
||||
for i := range c.Jobs {
|
||||
j, err := buildJob(c.Global, c.Jobs[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
js[i] = j
|
||||
}
|
||||
return js, nil
|
||||
}
|
||||
|
||||
func buildJob(c config.Global, in config.JobEnum) (j Job, err error) {
|
||||
|
||||
switch v := in.Ret.(type) {
|
||||
default:
|
||||
panic(fmt.Sprintf("implementation error: unknown job type %s", v))
|
||||
}
|
||||
|
||||
}
|
47
daemon/job/job.go
Normal file
47
daemon/job/job.go
Normal file
@ -0,0 +1,47 @@
|
||||
package job
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
)
|
||||
|
||||
type Logger = logger.Logger
|
||||
|
||||
type contextKey int
|
||||
|
||||
const (
|
||||
contextKeyLog contextKey = iota
|
||||
contextKeyWakeup
|
||||
)
|
||||
|
||||
func GetLogger(ctx context.Context) Logger {
|
||||
if l, ok := ctx.Value(contextKeyLog).(Logger); ok {
|
||||
return l
|
||||
}
|
||||
return logger.NewNullLogger()
|
||||
}
|
||||
|
||||
func WithLogger(ctx context.Context, l Logger) context.Context {
|
||||
return context.WithValue(ctx, contextKeyLog, l)
|
||||
}
|
||||
|
||||
func WithWakeup(ctx context.Context) (context.Context, WakeupChan) {
|
||||
wc := make(chan struct{}, 1)
|
||||
return context.WithValue(ctx, contextKeyWakeup, wc), wc
|
||||
}
|
||||
|
||||
type Job interface {
|
||||
Name() string
|
||||
Run(ctx context.Context)
|
||||
Status() interface{}
|
||||
}
|
||||
|
||||
type WakeupChan <-chan struct{}
|
||||
|
||||
func WaitWakeup(ctx context.Context) WakeupChan {
|
||||
wc, ok := ctx.Value(contextKeyWakeup).(WakeupChan)
|
||||
if !ok {
|
||||
wc = make(chan struct{})
|
||||
}
|
||||
return wc
|
||||
}
|
181
daemon/logging/build_logging.go
Normal file
181
daemon/logging/build_logging.go
Normal file
@ -0,0 +1,181 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"github.com/zrepl/zrepl/cmd/config"
|
||||
"os"
|
||||
"github.com/mattn/go-isatty"
|
||||
"crypto/tls"
|
||||
"github.com/pkg/errors"
|
||||
"crypto/x509"
|
||||
"github.com/zrepl/zrepl/cmd/tlsconf"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
)
|
||||
|
||||
func OutletsFromConfig(in []config.LoggingOutletEnum) (*logger.Outlets, error) {
|
||||
|
||||
outlets := logger.NewOutlets()
|
||||
|
||||
if len(in) == 0 {
|
||||
// Default config
|
||||
out := WriterOutlet{&HumanFormatter{}, os.Stdout}
|
||||
outlets.Add(out, logger.Warn)
|
||||
return outlets, nil
|
||||
}
|
||||
|
||||
var syslogOutlets, stdoutOutlets int
|
||||
for lei, le := range in {
|
||||
|
||||
outlet, minLevel, err := parseOutlet(le)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot parse outlet #%d", lei)
|
||||
}
|
||||
var _ logger.Outlet = WriterOutlet{}
|
||||
var _ logger.Outlet = &SyslogOutlet{}
|
||||
switch outlet.(type) {
|
||||
case *SyslogOutlet:
|
||||
syslogOutlets++
|
||||
case WriterOutlet:
|
||||
stdoutOutlets++
|
||||
}
|
||||
|
||||
outlets.Add(outlet, minLevel)
|
||||
|
||||
}
|
||||
|
||||
if syslogOutlets > 1 {
|
||||
return nil, errors.Errorf("can only define one 'syslog' outlet")
|
||||
}
|
||||
if stdoutOutlets > 1 {
|
||||
return nil, errors.Errorf("can only define one 'stdout' outlet")
|
||||
}
|
||||
|
||||
return outlets, nil
|
||||
|
||||
}
|
||||
|
||||
|
||||
func parseLogFormat(i interface{}) (f EntryFormatter, err error) {
|
||||
var is string
|
||||
switch j := i.(type) {
|
||||
case string:
|
||||
is = j
|
||||
default:
|
||||
return nil, errors.Errorf("invalid log format: wrong type: %T", i)
|
||||
}
|
||||
|
||||
switch is {
|
||||
case "human":
|
||||
return &HumanFormatter{}, nil
|
||||
case "logfmt":
|
||||
return &LogfmtFormatter{}, nil
|
||||
case "json":
|
||||
return &JSONFormatter{}, nil
|
||||
default:
|
||||
return nil, errors.Errorf("invalid log format: '%s'", is)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func parseOutlet(in config.LoggingOutletEnum) (o logger.Outlet, level logger.Level, err error) {
|
||||
|
||||
parseCommon := func(common config.LoggingOutletCommon) (logger.Level, EntryFormatter, error) {
|
||||
if common.Level == "" || common.Format == "" {
|
||||
return 0, nil, errors.Errorf("must specify 'level' and 'format' field")
|
||||
}
|
||||
|
||||
minLevel, err := logger.ParseLevel(common.Level)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Wrap(err, "cannot parse 'level' field")
|
||||
}
|
||||
formatter, err := parseLogFormat(common.Format)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Wrap(err, "cannot parse 'formatter' field")
|
||||
}
|
||||
return minLevel, formatter, nil
|
||||
}
|
||||
|
||||
var f EntryFormatter
|
||||
|
||||
switch v := in.Ret.(type) {
|
||||
case config.StdoutLoggingOutlet:
|
||||
level, f, err = parseCommon(v.LoggingOutletCommon)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
o, err = parseStdoutOutlet(v, f)
|
||||
case config.TCPLoggingOutlet:
|
||||
level, f, err = parseCommon(v.LoggingOutletCommon)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
o, err = parseTCPOutlet(v, f)
|
||||
case config.SyslogLoggingOutlet:
|
||||
level, f, err = parseCommon(v.LoggingOutletCommon)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
o, err = parseSyslogOutlet(v, f)
|
||||
default:
|
||||
panic(v)
|
||||
}
|
||||
return o, level, err
|
||||
}
|
||||
|
||||
func parseStdoutOutlet(in config.StdoutLoggingOutlet, formatter EntryFormatter) (WriterOutlet, error) {
|
||||
flags := MetadataAll
|
||||
writer := os.Stdout
|
||||
if !isatty.IsTerminal(writer.Fd()) && !in.Time {
|
||||
flags &= ^MetadataTime
|
||||
}
|
||||
|
||||
formatter.SetMetadataFlags(flags)
|
||||
return WriterOutlet{
|
||||
formatter,
|
||||
os.Stdout,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func parseTCPOutlet(in config.TCPLoggingOutlet, formatter EntryFormatter) (out *TCPOutlet, err error) {
|
||||
var tlsConfig *tls.Config
|
||||
if in.TLS != nil {
|
||||
tlsConfig, err = func(m *config.TCPLoggingOutletTLS, host string) (*tls.Config, error) {
|
||||
clientCert, err := tls.LoadX509KeyPair(m.Cert, m.Key)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot load client cert")
|
||||
}
|
||||
|
||||
var rootCAs *x509.CertPool
|
||||
if m.CA == "" {
|
||||
if rootCAs, err = x509.SystemCertPool(); err != nil {
|
||||
return nil, errors.Wrap(err, "cannot open system cert pool")
|
||||
}
|
||||
} else {
|
||||
rootCAs, err = tlsconf.ParseCAFile(m.CA)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannot parse CA cert")
|
||||
}
|
||||
}
|
||||
if rootCAs == nil {
|
||||
panic("invariant violated")
|
||||
}
|
||||
|
||||
return tlsconf.ClientAuthClient(host, rootCAs, clientCert)
|
||||
}(in.TLS, in.Address)
|
||||
if err != nil {
|
||||
return nil, errors.New("cannot not parse TLS config in field 'tls'")
|
||||
}
|
||||
}
|
||||
|
||||
formatter.SetMetadataFlags(MetadataAll)
|
||||
return NewTCPOutlet(formatter, in.Net, in.Address, tlsConfig, in.RetryInterval), nil
|
||||
|
||||
}
|
||||
|
||||
func parseSyslogOutlet(in config.SyslogLoggingOutlet, formatter EntryFormatter) (out *SyslogOutlet, err error) {
|
||||
out = &SyslogOutlet{}
|
||||
out.Formatter = formatter
|
||||
out.Formatter.SetMetadataFlags(MetadataNone)
|
||||
out.RetryInterval = in.RetryInterval
|
||||
return out, nil
|
||||
}
|
||||
|
208
daemon/logging/logging_formatters.go
Normal file
208
daemon/logging/logging_formatters.go
Normal file
@ -0,0 +1,208 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/go-logfmt/logfmt"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
FieldLevel = "level"
|
||||
FieldMessage = "msg"
|
||||
FieldTime = "time"
|
||||
)
|
||||
|
||||
const (
|
||||
logJobField string = "job"
|
||||
logTaskField string = "task"
|
||||
logSubsysField string = "subsystem"
|
||||
)
|
||||
|
||||
|
||||
type MetadataFlags int64
|
||||
|
||||
const (
|
||||
MetadataTime MetadataFlags = 1 << iota
|
||||
MetadataLevel
|
||||
|
||||
MetadataNone MetadataFlags = 0
|
||||
MetadataAll MetadataFlags = ^0
|
||||
)
|
||||
|
||||
|
||||
type NoFormatter struct{}
|
||||
|
||||
func (f NoFormatter) SetMetadataFlags(flags MetadataFlags) {}
|
||||
|
||||
func (f NoFormatter) Format(e *logger.Entry) ([]byte, error) {
|
||||
return []byte(e.Message), nil
|
||||
}
|
||||
|
||||
type HumanFormatter struct {
|
||||
metadataFlags MetadataFlags
|
||||
ignoreFields map[string]bool
|
||||
}
|
||||
|
||||
const HumanFormatterDateFormat = time.RFC3339
|
||||
|
||||
func (f *HumanFormatter) SetMetadataFlags(flags MetadataFlags) {
|
||||
f.metadataFlags = flags
|
||||
}
|
||||
|
||||
func (f *HumanFormatter) SetIgnoreFields(ignore []string) {
|
||||
if ignore == nil {
|
||||
f.ignoreFields = nil
|
||||
return
|
||||
}
|
||||
f.ignoreFields = make(map[string]bool, len(ignore))
|
||||
|
||||
for _, field := range ignore {
|
||||
f.ignoreFields[field] = true
|
||||
}
|
||||
}
|
||||
|
||||
func (f *HumanFormatter) ignored(field string) bool {
|
||||
return f.ignoreFields != nil && f.ignoreFields[field]
|
||||
}
|
||||
|
||||
func (f *HumanFormatter) Format(e *logger.Entry) (out []byte, err error) {
|
||||
|
||||
var line bytes.Buffer
|
||||
|
||||
if f.metadataFlags&MetadataTime != 0 {
|
||||
fmt.Fprintf(&line, "%s ", e.Time.Format(HumanFormatterDateFormat))
|
||||
}
|
||||
if f.metadataFlags&MetadataLevel != 0 {
|
||||
fmt.Fprintf(&line, "[%s]", e.Level.Short())
|
||||
}
|
||||
|
||||
prefixFields := []string{logJobField, logTaskField, logSubsysField}
|
||||
prefixed := make(map[string]bool, len(prefixFields)+2)
|
||||
for _, field := range prefixFields {
|
||||
val, ok := e.Fields[field].(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if !f.ignored(field) {
|
||||
fmt.Fprintf(&line, "[%s]", val)
|
||||
prefixed[field] = true
|
||||
}
|
||||
}
|
||||
|
||||
if line.Len() > 0 {
|
||||
fmt.Fprint(&line, ": ")
|
||||
}
|
||||
fmt.Fprint(&line, e.Message)
|
||||
|
||||
if len(e.Fields)-len(prefixed) > 0 {
|
||||
fmt.Fprint(&line, " ")
|
||||
enc := logfmt.NewEncoder(&line)
|
||||
for field, value := range e.Fields {
|
||||
if prefixed[field] || f.ignored(field) {
|
||||
continue
|
||||
}
|
||||
if err := logfmtTryEncodeKeyval(enc, field, value); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return line.Bytes(), nil
|
||||
}
|
||||
|
||||
type JSONFormatter struct {
|
||||
metadataFlags MetadataFlags
|
||||
}
|
||||
|
||||
func (f *JSONFormatter) SetMetadataFlags(flags MetadataFlags) {
|
||||
f.metadataFlags = flags
|
||||
}
|
||||
|
||||
func (f *JSONFormatter) Format(e *logger.Entry) ([]byte, error) {
|
||||
data := make(logger.Fields, len(e.Fields)+3)
|
||||
for k, v := range e.Fields {
|
||||
switch v := v.(type) {
|
||||
case error:
|
||||
// Otherwise errors are ignored by `encoding/json`
|
||||
// https://github.com/sirupsen/logrus/issues/137
|
||||
data[k] = v.Error()
|
||||
default:
|
||||
_, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("field is not JSON encodable: %s", k)
|
||||
}
|
||||
data[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
data[FieldMessage] = e.Message
|
||||
data[FieldTime] = e.Time.Format(time.RFC3339)
|
||||
data[FieldLevel] = e.Level
|
||||
|
||||
return json.Marshal(data)
|
||||
|
||||
}
|
||||
|
||||
type LogfmtFormatter struct {
|
||||
metadataFlags MetadataFlags
|
||||
}
|
||||
|
||||
func (f *LogfmtFormatter) SetMetadataFlags(flags MetadataFlags) {
|
||||
f.metadataFlags = flags
|
||||
}
|
||||
|
||||
func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
enc := logfmt.NewEncoder(&buf)
|
||||
|
||||
if f.metadataFlags&MetadataTime != 0 {
|
||||
enc.EncodeKeyval(FieldTime, e.Time)
|
||||
}
|
||||
if f.metadataFlags&MetadataLevel != 0 {
|
||||
enc.EncodeKeyval(FieldLevel, e.Level)
|
||||
}
|
||||
|
||||
// at least try and put job and task in front
|
||||
prefixed := make(map[string]bool, 2)
|
||||
prefix := []string{logJobField, logTaskField, logSubsysField}
|
||||
for _, pf := range prefix {
|
||||
v, ok := e.Fields[pf]
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if err := logfmtTryEncodeKeyval(enc, pf, v); err != nil {
|
||||
return nil, err // unlikely
|
||||
}
|
||||
prefixed[pf] = true
|
||||
}
|
||||
|
||||
enc.EncodeKeyval(FieldMessage, e.Message)
|
||||
|
||||
for k, v := range e.Fields {
|
||||
if !prefixed[k] {
|
||||
if err := logfmtTryEncodeKeyval(enc, k, v); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func logfmtTryEncodeKeyval(enc *logfmt.Encoder, field, value interface{}) error {
|
||||
|
||||
err := enc.EncodeKeyval(field, value)
|
||||
switch err {
|
||||
case nil: // ok
|
||||
return nil
|
||||
case logfmt.ErrUnsupportedValueType:
|
||||
enc.EncodeKeyval(field, fmt.Sprintf("<%T>", value))
|
||||
return nil
|
||||
}
|
||||
return errors.Wrapf(err, "cannot encode field '%s'", field)
|
||||
|
||||
}
|
167
daemon/logging/logging_outlets.go
Normal file
167
daemon/logging/logging_outlets.go
Normal file
@ -0,0 +1,167 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
"io"
|
||||
"log/syslog"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
type EntryFormatter interface {
|
||||
SetMetadataFlags(flags MetadataFlags)
|
||||
Format(e *logger.Entry) ([]byte, error)
|
||||
}
|
||||
|
||||
type WriterOutlet struct {
|
||||
formatter EntryFormatter
|
||||
writer io.Writer
|
||||
}
|
||||
|
||||
func (h WriterOutlet) WriteEntry(entry logger.Entry) error {
|
||||
bytes, err := h.formatter.Format(&entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = h.writer.Write(bytes)
|
||||
h.writer.Write([]byte("\n"))
|
||||
return err
|
||||
}
|
||||
|
||||
type TCPOutlet struct {
|
||||
formatter EntryFormatter
|
||||
// Specifies how much time must pass between a connection error and a reconnection attempt
|
||||
// Log entries written to the outlet during this time interval are silently dropped.
|
||||
connect func(ctx context.Context) (net.Conn, error)
|
||||
entryChan chan *bytes.Buffer
|
||||
}
|
||||
|
||||
func NewTCPOutlet(formatter EntryFormatter, network, address string, tlsConfig *tls.Config, retryInterval time.Duration) *TCPOutlet {
|
||||
|
||||
connect := func(ctx context.Context) (conn net.Conn, err error) {
|
||||
deadl, ok := ctx.Deadline()
|
||||
if !ok {
|
||||
deadl = time.Time{}
|
||||
}
|
||||
dialer := net.Dialer{
|
||||
Deadline: deadl,
|
||||
}
|
||||
if tlsConfig != nil {
|
||||
conn, err = tls.DialWithDialer(&dialer, network, address, tlsConfig)
|
||||
} else {
|
||||
conn, err = dialer.DialContext(ctx, network, address)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
entryChan := make(chan *bytes.Buffer, 1) // allow one message in flight while previos is in io.Copy()
|
||||
|
||||
o := &TCPOutlet{
|
||||
formatter: formatter,
|
||||
connect: connect,
|
||||
entryChan: entryChan,
|
||||
}
|
||||
|
||||
go o.outLoop(retryInterval)
|
||||
|
||||
return o
|
||||
}
|
||||
|
||||
// FIXME: use this method
|
||||
func (h *TCPOutlet) Close() {
|
||||
close(h.entryChan)
|
||||
}
|
||||
|
||||
func (h *TCPOutlet) outLoop(retryInterval time.Duration) {
|
||||
|
||||
var retry time.Time
|
||||
var conn net.Conn
|
||||
for msg := range h.entryChan {
|
||||
var err error
|
||||
for conn == nil {
|
||||
time.Sleep(time.Until(retry))
|
||||
ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(retryInterval))
|
||||
conn, err = h.connect(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
retry = time.Now().Add(retryInterval)
|
||||
conn = nil
|
||||
}
|
||||
}
|
||||
conn.SetWriteDeadline(time.Now().Add(retryInterval))
|
||||
_, err = io.Copy(conn, msg)
|
||||
if err != nil {
|
||||
retry = time.Now().Add(retryInterval)
|
||||
conn.Close()
|
||||
conn = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *TCPOutlet) WriteEntry(e logger.Entry) error {
|
||||
|
||||
ebytes, err := h.formatter.Format(&e)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
buf.Write(ebytes)
|
||||
buf.WriteString("\n")
|
||||
|
||||
select {
|
||||
case h.entryChan <- buf:
|
||||
return nil
|
||||
default:
|
||||
return errors.New("connection broken or not fast enough")
|
||||
}
|
||||
}
|
||||
|
||||
type SyslogOutlet struct {
|
||||
Formatter EntryFormatter
|
||||
RetryInterval time.Duration
|
||||
writer *syslog.Writer
|
||||
lastConnectAttempt time.Time
|
||||
}
|
||||
|
||||
func (o *SyslogOutlet) WriteEntry(entry logger.Entry) error {
|
||||
|
||||
bytes, err := o.Formatter.Format(&entry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s := string(bytes)
|
||||
|
||||
if o.writer == nil {
|
||||
now := time.Now()
|
||||
if now.Sub(o.lastConnectAttempt) < o.RetryInterval {
|
||||
return nil // not an error toward logger
|
||||
}
|
||||
o.writer, err = syslog.New(syslog.LOG_LOCAL0, "zrepl")
|
||||
o.lastConnectAttempt = time.Now()
|
||||
if err != nil {
|
||||
o.writer = nil
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
switch entry.Level {
|
||||
case logger.Debug:
|
||||
return o.writer.Debug(s)
|
||||
case logger.Info:
|
||||
return o.writer.Info(s)
|
||||
case logger.Warn:
|
||||
return o.writer.Warning(s)
|
||||
case logger.Error:
|
||||
return o.writer.Err(s)
|
||||
default:
|
||||
return o.writer.Err(s) // write as error as reaching this case is in fact an error
|
||||
}
|
||||
|
||||
}
|
8
daemon/main.go
Normal file
8
daemon/main.go
Normal file
@ -0,0 +1,8 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"github.com/zrepl/zrepl/logger"
|
||||
)
|
||||
|
||||
type Logger = logger.Logger
|
||||
|
80
daemon/pprof.go
Normal file
80
daemon/pprof.go
Normal file
@ -0,0 +1,80 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
// FIXME: importing this package has the side-effect of poisoning the http.DefaultServeMux
|
||||
// FIXME: with the /debug/pprof endpoints
|
||||
"context"
|
||||
"net"
|
||||
"net/http/pprof"
|
||||
)
|
||||
|
||||
type PProfServer struct {
|
||||
cc chan PprofServerControlMsg
|
||||
state PprofServerControlMsg
|
||||
listener net.Listener
|
||||
}
|
||||
|
||||
type PprofServerControlMsg struct {
|
||||
// Whether the server should listen for requests on the given address
|
||||
Run bool
|
||||
// Must be set if Run is true, undefined otherwise
|
||||
HttpListenAddress string
|
||||
}
|
||||
|
||||
func NewPProfServer(ctx context.Context) *PProfServer {
|
||||
|
||||
s := &PProfServer{
|
||||
cc: make(chan PprofServerControlMsg),
|
||||
}
|
||||
|
||||
go s.controlLoop(ctx)
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *PProfServer) controlLoop(ctx context.Context) {
|
||||
outer:
|
||||
for {
|
||||
|
||||
var msg PprofServerControlMsg
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if s.listener != nil {
|
||||
s.listener.Close()
|
||||
}
|
||||
break outer
|
||||
case msg = <-s.cc:
|
||||
// proceed
|
||||
}
|
||||
|
||||
var err error
|
||||
if msg.Run && s.listener == nil {
|
||||
|
||||
s.listener, err = net.Listen("tcp", msg.HttpListenAddress)
|
||||
if err != nil {
|
||||
s.listener = nil
|
||||
continue
|
||||
}
|
||||
|
||||
// FIXME: because net/http/pprof does not provide a mux,
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
|
||||
mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
|
||||
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
|
||||
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
|
||||
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
|
||||
go http.Serve(s.listener, mux)
|
||||
continue
|
||||
}
|
||||
|
||||
if !msg.Run && s.listener != nil {
|
||||
s.listener.Close()
|
||||
s.listener = nil
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *PProfServer) Control(msg PprofServerControlMsg) {
|
||||
s.cc <- msg
|
||||
}
|
82
daemon/prometheus.go
Normal file
82
daemon/prometheus.go
Normal file
@ -0,0 +1,82 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/zrepl/zrepl/cmd/daemon/job"
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
"net"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type prometheusJob struct {
|
||||
listen string
|
||||
}
|
||||
|
||||
func newPrometheusJob(listen string) *prometheusJob {
|
||||
return &prometheusJob{listen}
|
||||
}
|
||||
|
||||
var prom struct {
|
||||
taskLastActiveStart *prometheus.GaugeVec
|
||||
taskLastActiveDuration *prometheus.GaugeVec
|
||||
taskLogEntries *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func init() {
|
||||
prom.taskLastActiveStart = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "zrepl",
|
||||
Subsystem: "daemon",
|
||||
Name: "task_last_active_start",
|
||||
Help: "point in time at which the job task last left idle state",
|
||||
}, []string{"zrepl_job", "job_type", "task"})
|
||||
prom.taskLastActiveDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "zrepl",
|
||||
Subsystem: "daemon",
|
||||
Name: "task_last_active_duration",
|
||||
Help: "seconds that the last run ob a job task spent between leaving and re-entering idle state",
|
||||
}, []string{"zrepl_job", "job_type", "task"})
|
||||
prom.taskLogEntries = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "zrepl",
|
||||
Subsystem: "daemon",
|
||||
Name: "task_log_entries",
|
||||
Help: "number of log entries per job task and level",
|
||||
}, []string{"zrepl_job", "job_type", "task", "level"})
|
||||
prometheus.MustRegister(prom.taskLastActiveStart)
|
||||
prometheus.MustRegister(prom.taskLastActiveDuration)
|
||||
prometheus.MustRegister(prom.taskLogEntries)
|
||||
}
|
||||
|
||||
func (j *prometheusJob) Name() string { return jobNamePrometheus }
|
||||
|
||||
func (j *prometheusJob) Status() interface{} { return nil }
|
||||
|
||||
func (j *prometheusJob) Run(ctx context.Context) {
|
||||
|
||||
if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log := job.GetLogger(ctx)
|
||||
|
||||
l, err := net.Listen("tcp", j.listen)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("cannot listen")
|
||||
}
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
l.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
err = http.Serve(l, mux)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error while serving")
|
||||
}
|
||||
|
||||
}
|
42
main.go
42
main.go
@ -2,13 +2,51 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/zrepl/zrepl/cmd"
|
||||
"log"
|
||||
"os"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/zrepl/zrepl/daemon"
|
||||
"github.com/zrepl/zrepl/cmd/config"
|
||||
)
|
||||
|
||||
var rootCmd = &cobra.Command{
|
||||
Use: "zrepl",
|
||||
Short: "ZFS dataset replication",
|
||||
Long: `Replicate ZFS filesystems & volumes between pools:
|
||||
|
||||
- push & pull mode
|
||||
- automatic snapshot creation & pruning
|
||||
- local / over the network
|
||||
- ACLs instead of blank SSH access`,
|
||||
}
|
||||
|
||||
|
||||
var daemonCmd = &cobra.Command{
|
||||
Use: "daemon",
|
||||
Short: "daemon",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
conf, err := config.ParseConfig(rootArgs.configFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return daemon.Run(conf)
|
||||
},
|
||||
}
|
||||
|
||||
var rootArgs struct {
|
||||
configFile string
|
||||
}
|
||||
|
||||
func init() {
|
||||
//cobra.OnInitialize(initConfig)
|
||||
rootCmd.PersistentFlags().StringVar(&rootArgs.configFile, "config", "", "config file path")
|
||||
rootCmd.AddCommand(daemonCmd)
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := cmd.RootCmd.Execute(); err != nil {
|
||||
|
||||
|
||||
if err := rootCmd.Execute(); err != nil {
|
||||
log.Printf("error executing root command: %s", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user