diff --git a/daemon/control.go b/daemon/control.go new file mode 100644 index 0000000..0aff1ce --- /dev/null +++ b/daemon/control.go @@ -0,0 +1,142 @@ +package daemon + +import ( + "bytes" + "context" + "encoding/json" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/cmd/daemon/job" + "github.com/zrepl/zrepl/cmd/helpers" + "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/version" + "io" + "net" + "net/http" +) + +type controlJob struct { + sockaddr *net.UnixAddr + jobs *jobs +} + +func newControlJob(sockpath string, jobs *jobs) (j *controlJob, err error) { + j = &controlJob{jobs: jobs} + + j.sockaddr, err = net.ResolveUnixAddr("unix", sockpath) + if err != nil { + err = errors.Wrap(err, "cannot resolve unix address") + return + } + + return +} + +func (j *controlJob) Name() string { return jobNameControl } + +func (j *controlJob) Status() interface{} { return nil } + +const ( + ControlJobEndpointPProf string = "/debug/pprof" + ControlJobEndpointVersion string = "/version" + ControlJobEndpointStatus string = "/status" +) + +func (j *controlJob) Run(ctx context.Context) { + + log := job.GetLogger(ctx) + defer log.Info("control job finished") + + l, err := helpers.ListenUnixPrivate(j.sockaddr) + if err != nil { + log.WithError(err).Error("error listening") + return + } + + pprofServer := NewPProfServer(ctx) + + mux := http.NewServeMux() + mux.Handle(ControlJobEndpointPProf, requestLogger{log: log, handlerFunc: func(w http.ResponseWriter, r *http.Request) { + var msg PprofServerControlMsg + err := json.NewDecoder(r.Body).Decode(&msg) + if err != nil { + log.WithError(err).Error("bad pprof request from client") + w.WriteHeader(http.StatusBadRequest) + } + pprofServer.Control(msg) + w.WriteHeader(200) + }}) + mux.Handle(ControlJobEndpointVersion, + requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { + return version.NewZreplVersionInformation(), nil + }}}) + mux.Handle(ControlJobEndpointStatus, + requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { + s := j.jobs.status() + return s, nil + }}}) + server := http.Server{Handler: mux} + +outer: + for { + + served := make(chan error) + go func() { + served <- server.Serve(l) + close(served) + }() + + select { + case <-ctx.Done(): + log.WithError(ctx.Err()).Info("context done") + server.Shutdown(context.Background()) + break outer + case err = <-served: + if err != nil { + log.WithError(err).Error("error serving") + break outer + } + } + + } + +} + +type jsonResponder struct { + producer func() (interface{}, error) +} + +func (j jsonResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) { + res, err := j.producer() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + io.WriteString(w, err.Error()) + return + } + var buf bytes.Buffer + err = json.NewEncoder(&buf).Encode(res) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + io.WriteString(w, err.Error()) + } else { + io.Copy(w, &buf) + } +} + +type requestLogger struct { + log logger.Logger + handler http.Handler + handlerFunc http.HandlerFunc +} + +func (l requestLogger) ServeHTTP(w http.ResponseWriter, r *http.Request) { + log := l.log.WithField("method", r.Method).WithField("url", r.URL) + log.Info("start") + if l.handlerFunc != nil { + l.handlerFunc(w, r) + } else if l.handler != nil { + l.handler.ServeHTTP(w, r) + } else { + log.Error("no handler or handlerFunc configured") + } + log.Info("finish") +} diff --git a/daemon/daemon.go b/daemon/daemon.go new file mode 100644 index 0000000..a408aa3 --- /dev/null +++ b/daemon/daemon.go @@ -0,0 +1,176 @@ +package daemon + +import ( + "context" + "fmt" + "github.com/zrepl/zrepl/daemon/job" + "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/version" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + "github.com/zrepl/zrepl/cmd/config" + "github.com/zrepl/zrepl/daemon/logging" + "github.com/pkg/errors" +) + + +func Run(conf config.Config) error { + + ctx, cancel := context.WithCancel(context.Background()) + + defer cancel() + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigChan + cancel() + }() + + outlets, err := logging.OutletsFromConfig(conf.Global.Logging) + if err != nil { + return errors.Wrap(err, "cannot build logging from config") + } + + confJobs, err := job.JobsFromConfig(conf) + if err != nil { + return errors.Wrap(err, "cannot build jobs from config") + } + + log := logger.NewLogger(outlets, 1*time.Second) + log.Info(version.NewZreplVersionInformation().String()) + + for _, job := range confJobs { + if IsInternalJobName(job.Name()) { + panic(fmt.Sprintf("internal job name used for config job '%s'", job.Name())) //FIXME + } + } + + ctx = job.WithLogger(ctx, log) + + jobs := newJobs() + + // start control socket + controlJob, err := newControlJob(conf.Global.Control.SockPath, jobs) + if err != nil { + panic(err) // FIXME + } + jobs.start(ctx, controlJob, true) + + // start prometheus + //var promJob *prometheusJob // FIXME + //jobs.start(ctx, promJob, true) + + log.Info("starting daemon") + + // start regular jobs + for _, j := range confJobs { + jobs.start(ctx, j, false) + } + + select { + case <-jobs.wait(): + log.Info("all jobs finished") + case <-ctx.Done(): + log.WithError(ctx.Err()).Info("context finished") + } + log.Info("daemon exiting") + return nil +} + +type jobs struct { + wg sync.WaitGroup + + // m protects all fields below it + m sync.RWMutex + wakeups map[string]job.WakeupChan // by JobName + jobs map[string]job.Job +} + +func newJobs() *jobs { + return &jobs{ + wakeups: make(map[string]job.WakeupChan), + jobs: make(map[string]job.Job), + } +} + +const ( + logJobField string = "job" + logTaskField string = "task" + logSubsysField string = "subsystem" +) + +func (s *jobs) wait() <-chan struct{} { + ch := make(chan struct{}) + go func() { + s.wg.Wait() + }() + return ch +} + +func (s *jobs) status() map[string]interface{} { + s.m.RLock() + defer s.m.RUnlock() + + type res struct { + name string + status interface{} + } + var wg sync.WaitGroup + c := make(chan res, len(s.jobs)) + for name, j := range s.jobs { + wg.Add(1) + go func(name string, j job.Job) { + defer wg.Done() + c <- res{name: name, status: j.Status()} + }(name, j) + } + wg.Wait() + close(c) + ret := make(map[string]interface{}, len(s.jobs)) + for res := range c { + ret[res.name] = res.status + } + return ret +} + +const ( + jobNamePrometheus = "_prometheus" + jobNameControl = "_control" +) + +func IsInternalJobName(s string) bool { + return strings.HasPrefix(s, "_") +} + +func (s *jobs) start(ctx context.Context, j job.Job, internal bool) { + s.m.Lock() + defer s.m.Unlock() + + jobLog := job.GetLogger(ctx).WithField(logJobField, j.Name()) + jobName := j.Name() + if !internal && IsInternalJobName(jobName) { + panic(fmt.Sprintf("internal job name used for non-internal job %s", jobName)) + } + if internal && !IsInternalJobName(jobName) { + panic(fmt.Sprintf("internal job does not use internal job name %s", jobName)) + } + if _, ok := s.jobs[jobName]; ok { + panic(fmt.Sprintf("duplicate job name %s", jobName)) + } + s.jobs[jobName] = j + ctx = job.WithLogger(ctx, jobLog) + ctx, wakeupChan := job.WithWakeup(ctx) + s.wakeups[jobName] = wakeupChan + + s.wg.Add(1) + go func() { + defer s.wg.Done() + jobLog.Info("starting job") + defer jobLog.Info("job exited") + j.Run(ctx) + }() +} diff --git a/daemon/job/build_jobs.go b/daemon/job/build_jobs.go new file mode 100644 index 0000000..b36fc90 --- /dev/null +++ b/daemon/job/build_jobs.go @@ -0,0 +1,27 @@ +package job + +import ( + "github.com/zrepl/zrepl/cmd/config" + "fmt" +) + +func JobsFromConfig(c config.Config) ([]Job, error) { + js := make([]Job, len(c.Jobs)) + for i := range c.Jobs { + j, err := buildJob(c.Global, c.Jobs[i]) + if err != nil { + return nil, err + } + js[i] = j + } + return js, nil +} + +func buildJob(c config.Global, in config.JobEnum) (j Job, err error) { + + switch v := in.Ret.(type) { + default: + panic(fmt.Sprintf("implementation error: unknown job type %s", v)) + } + +} \ No newline at end of file diff --git a/daemon/job/job.go b/daemon/job/job.go new file mode 100644 index 0000000..56e25af --- /dev/null +++ b/daemon/job/job.go @@ -0,0 +1,47 @@ +package job + +import ( + "context" + "github.com/zrepl/zrepl/logger" +) + +type Logger = logger.Logger + +type contextKey int + +const ( + contextKeyLog contextKey = iota + contextKeyWakeup +) + +func GetLogger(ctx context.Context) Logger { + if l, ok := ctx.Value(contextKeyLog).(Logger); ok { + return l + } + return logger.NewNullLogger() +} + +func WithLogger(ctx context.Context, l Logger) context.Context { + return context.WithValue(ctx, contextKeyLog, l) +} + +func WithWakeup(ctx context.Context) (context.Context, WakeupChan) { + wc := make(chan struct{}, 1) + return context.WithValue(ctx, contextKeyWakeup, wc), wc +} + +type Job interface { + Name() string + Run(ctx context.Context) + Status() interface{} +} + +type WakeupChan <-chan struct{} + +func WaitWakeup(ctx context.Context) WakeupChan { + wc, ok := ctx.Value(contextKeyWakeup).(WakeupChan) + if !ok { + wc = make(chan struct{}) + } + return wc +} diff --git a/daemon/logging/build_logging.go b/daemon/logging/build_logging.go new file mode 100644 index 0000000..6e2b4d6 --- /dev/null +++ b/daemon/logging/build_logging.go @@ -0,0 +1,181 @@ +package logging + +import ( + "github.com/zrepl/zrepl/cmd/config" + "os" + "github.com/mattn/go-isatty" + "crypto/tls" + "github.com/pkg/errors" + "crypto/x509" + "github.com/zrepl/zrepl/cmd/tlsconf" + "github.com/zrepl/zrepl/logger" +) + +func OutletsFromConfig(in []config.LoggingOutletEnum) (*logger.Outlets, error) { + + outlets := logger.NewOutlets() + + if len(in) == 0 { + // Default config + out := WriterOutlet{&HumanFormatter{}, os.Stdout} + outlets.Add(out, logger.Warn) + return outlets, nil + } + + var syslogOutlets, stdoutOutlets int + for lei, le := range in { + + outlet, minLevel, err := parseOutlet(le) + if err != nil { + return nil, errors.Wrapf(err, "cannot parse outlet #%d", lei) + } + var _ logger.Outlet = WriterOutlet{} + var _ logger.Outlet = &SyslogOutlet{} + switch outlet.(type) { + case *SyslogOutlet: + syslogOutlets++ + case WriterOutlet: + stdoutOutlets++ + } + + outlets.Add(outlet, minLevel) + + } + + if syslogOutlets > 1 { + return nil, errors.Errorf("can only define one 'syslog' outlet") + } + if stdoutOutlets > 1 { + return nil, errors.Errorf("can only define one 'stdout' outlet") + } + + return outlets, nil + +} + + +func parseLogFormat(i interface{}) (f EntryFormatter, err error) { + var is string + switch j := i.(type) { + case string: + is = j + default: + return nil, errors.Errorf("invalid log format: wrong type: %T", i) + } + + switch is { + case "human": + return &HumanFormatter{}, nil + case "logfmt": + return &LogfmtFormatter{}, nil + case "json": + return &JSONFormatter{}, nil + default: + return nil, errors.Errorf("invalid log format: '%s'", is) + } + +} + +func parseOutlet(in config.LoggingOutletEnum) (o logger.Outlet, level logger.Level, err error) { + + parseCommon := func(common config.LoggingOutletCommon) (logger.Level, EntryFormatter, error) { + if common.Level == "" || common.Format == "" { + return 0, nil, errors.Errorf("must specify 'level' and 'format' field") + } + + minLevel, err := logger.ParseLevel(common.Level) + if err != nil { + return 0, nil, errors.Wrap(err, "cannot parse 'level' field") + } + formatter, err := parseLogFormat(common.Format) + if err != nil { + return 0, nil, errors.Wrap(err, "cannot parse 'formatter' field") + } + return minLevel, formatter, nil + } + + var f EntryFormatter + + switch v := in.Ret.(type) { + case config.StdoutLoggingOutlet: + level, f, err = parseCommon(v.LoggingOutletCommon) + if err != nil { + break + } + o, err = parseStdoutOutlet(v, f) + case config.TCPLoggingOutlet: + level, f, err = parseCommon(v.LoggingOutletCommon) + if err != nil { + break + } + o, err = parseTCPOutlet(v, f) + case config.SyslogLoggingOutlet: + level, f, err = parseCommon(v.LoggingOutletCommon) + if err != nil { + break + } + o, err = parseSyslogOutlet(v, f) + default: + panic(v) + } + return o, level, err +} + +func parseStdoutOutlet(in config.StdoutLoggingOutlet, formatter EntryFormatter) (WriterOutlet, error) { + flags := MetadataAll + writer := os.Stdout + if !isatty.IsTerminal(writer.Fd()) && !in.Time { + flags &= ^MetadataTime + } + + formatter.SetMetadataFlags(flags) + return WriterOutlet{ + formatter, + os.Stdout, + }, nil +} + +func parseTCPOutlet(in config.TCPLoggingOutlet, formatter EntryFormatter) (out *TCPOutlet, err error) { + var tlsConfig *tls.Config + if in.TLS != nil { + tlsConfig, err = func(m *config.TCPLoggingOutletTLS, host string) (*tls.Config, error) { + clientCert, err := tls.LoadX509KeyPair(m.Cert, m.Key) + if err != nil { + return nil, errors.Wrap(err, "cannot load client cert") + } + + var rootCAs *x509.CertPool + if m.CA == "" { + if rootCAs, err = x509.SystemCertPool(); err != nil { + return nil, errors.Wrap(err, "cannot open system cert pool") + } + } else { + rootCAs, err = tlsconf.ParseCAFile(m.CA) + if err != nil { + return nil, errors.Wrap(err, "cannot parse CA cert") + } + } + if rootCAs == nil { + panic("invariant violated") + } + + return tlsconf.ClientAuthClient(host, rootCAs, clientCert) + }(in.TLS, in.Address) + if err != nil { + return nil, errors.New("cannot not parse TLS config in field 'tls'") + } + } + + formatter.SetMetadataFlags(MetadataAll) + return NewTCPOutlet(formatter, in.Net, in.Address, tlsConfig, in.RetryInterval), nil + +} + +func parseSyslogOutlet(in config.SyslogLoggingOutlet, formatter EntryFormatter) (out *SyslogOutlet, err error) { + out = &SyslogOutlet{} + out.Formatter = formatter + out.Formatter.SetMetadataFlags(MetadataNone) + out.RetryInterval = in.RetryInterval + return out, nil +} + diff --git a/daemon/logging/logging_formatters.go b/daemon/logging/logging_formatters.go new file mode 100644 index 0000000..b968531 --- /dev/null +++ b/daemon/logging/logging_formatters.go @@ -0,0 +1,208 @@ +package logging + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/go-logfmt/logfmt" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/logger" + "time" +) + +const ( + FieldLevel = "level" + FieldMessage = "msg" + FieldTime = "time" +) + +const ( + logJobField string = "job" + logTaskField string = "task" + logSubsysField string = "subsystem" +) + + +type MetadataFlags int64 + +const ( + MetadataTime MetadataFlags = 1 << iota + MetadataLevel + + MetadataNone MetadataFlags = 0 + MetadataAll MetadataFlags = ^0 +) + + +type NoFormatter struct{} + +func (f NoFormatter) SetMetadataFlags(flags MetadataFlags) {} + +func (f NoFormatter) Format(e *logger.Entry) ([]byte, error) { + return []byte(e.Message), nil +} + +type HumanFormatter struct { + metadataFlags MetadataFlags + ignoreFields map[string]bool +} + +const HumanFormatterDateFormat = time.RFC3339 + +func (f *HumanFormatter) SetMetadataFlags(flags MetadataFlags) { + f.metadataFlags = flags +} + +func (f *HumanFormatter) SetIgnoreFields(ignore []string) { + if ignore == nil { + f.ignoreFields = nil + return + } + f.ignoreFields = make(map[string]bool, len(ignore)) + + for _, field := range ignore { + f.ignoreFields[field] = true + } +} + +func (f *HumanFormatter) ignored(field string) bool { + return f.ignoreFields != nil && f.ignoreFields[field] +} + +func (f *HumanFormatter) Format(e *logger.Entry) (out []byte, err error) { + + var line bytes.Buffer + + if f.metadataFlags&MetadataTime != 0 { + fmt.Fprintf(&line, "%s ", e.Time.Format(HumanFormatterDateFormat)) + } + if f.metadataFlags&MetadataLevel != 0 { + fmt.Fprintf(&line, "[%s]", e.Level.Short()) + } + + prefixFields := []string{logJobField, logTaskField, logSubsysField} + prefixed := make(map[string]bool, len(prefixFields)+2) + for _, field := range prefixFields { + val, ok := e.Fields[field].(string) + if !ok { + continue + } + if !f.ignored(field) { + fmt.Fprintf(&line, "[%s]", val) + prefixed[field] = true + } + } + + if line.Len() > 0 { + fmt.Fprint(&line, ": ") + } + fmt.Fprint(&line, e.Message) + + if len(e.Fields)-len(prefixed) > 0 { + fmt.Fprint(&line, " ") + enc := logfmt.NewEncoder(&line) + for field, value := range e.Fields { + if prefixed[field] || f.ignored(field) { + continue + } + if err := logfmtTryEncodeKeyval(enc, field, value); err != nil { + return nil, err + } + } + } + + return line.Bytes(), nil +} + +type JSONFormatter struct { + metadataFlags MetadataFlags +} + +func (f *JSONFormatter) SetMetadataFlags(flags MetadataFlags) { + f.metadataFlags = flags +} + +func (f *JSONFormatter) Format(e *logger.Entry) ([]byte, error) { + data := make(logger.Fields, len(e.Fields)+3) + for k, v := range e.Fields { + switch v := v.(type) { + case error: + // Otherwise errors are ignored by `encoding/json` + // https://github.com/sirupsen/logrus/issues/137 + data[k] = v.Error() + default: + _, err := json.Marshal(v) + if err != nil { + return nil, errors.Errorf("field is not JSON encodable: %s", k) + } + data[k] = v + } + } + + data[FieldMessage] = e.Message + data[FieldTime] = e.Time.Format(time.RFC3339) + data[FieldLevel] = e.Level + + return json.Marshal(data) + +} + +type LogfmtFormatter struct { + metadataFlags MetadataFlags +} + +func (f *LogfmtFormatter) SetMetadataFlags(flags MetadataFlags) { + f.metadataFlags = flags +} + +func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) { + var buf bytes.Buffer + enc := logfmt.NewEncoder(&buf) + + if f.metadataFlags&MetadataTime != 0 { + enc.EncodeKeyval(FieldTime, e.Time) + } + if f.metadataFlags&MetadataLevel != 0 { + enc.EncodeKeyval(FieldLevel, e.Level) + } + + // at least try and put job and task in front + prefixed := make(map[string]bool, 2) + prefix := []string{logJobField, logTaskField, logSubsysField} + for _, pf := range prefix { + v, ok := e.Fields[pf] + if !ok { + break + } + if err := logfmtTryEncodeKeyval(enc, pf, v); err != nil { + return nil, err // unlikely + } + prefixed[pf] = true + } + + enc.EncodeKeyval(FieldMessage, e.Message) + + for k, v := range e.Fields { + if !prefixed[k] { + if err := logfmtTryEncodeKeyval(enc, k, v); err != nil { + return nil, err + } + } + } + + return buf.Bytes(), nil +} + +func logfmtTryEncodeKeyval(enc *logfmt.Encoder, field, value interface{}) error { + + err := enc.EncodeKeyval(field, value) + switch err { + case nil: // ok + return nil + case logfmt.ErrUnsupportedValueType: + enc.EncodeKeyval(field, fmt.Sprintf("<%T>", value)) + return nil + } + return errors.Wrapf(err, "cannot encode field '%s'", field) + +} diff --git a/daemon/logging/logging_outlets.go b/daemon/logging/logging_outlets.go new file mode 100644 index 0000000..4a0fd7a --- /dev/null +++ b/daemon/logging/logging_outlets.go @@ -0,0 +1,167 @@ +package logging + +import ( + "bytes" + "context" + "crypto/tls" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/logger" + "io" + "log/syslog" + "net" + "time" +) + + +type EntryFormatter interface { + SetMetadataFlags(flags MetadataFlags) + Format(e *logger.Entry) ([]byte, error) +} + +type WriterOutlet struct { + formatter EntryFormatter + writer io.Writer +} + +func (h WriterOutlet) WriteEntry(entry logger.Entry) error { + bytes, err := h.formatter.Format(&entry) + if err != nil { + return err + } + _, err = h.writer.Write(bytes) + h.writer.Write([]byte("\n")) + return err +} + +type TCPOutlet struct { + formatter EntryFormatter + // Specifies how much time must pass between a connection error and a reconnection attempt + // Log entries written to the outlet during this time interval are silently dropped. + connect func(ctx context.Context) (net.Conn, error) + entryChan chan *bytes.Buffer +} + +func NewTCPOutlet(formatter EntryFormatter, network, address string, tlsConfig *tls.Config, retryInterval time.Duration) *TCPOutlet { + + connect := func(ctx context.Context) (conn net.Conn, err error) { + deadl, ok := ctx.Deadline() + if !ok { + deadl = time.Time{} + } + dialer := net.Dialer{ + Deadline: deadl, + } + if tlsConfig != nil { + conn, err = tls.DialWithDialer(&dialer, network, address, tlsConfig) + } else { + conn, err = dialer.DialContext(ctx, network, address) + } + return + } + + entryChan := make(chan *bytes.Buffer, 1) // allow one message in flight while previos is in io.Copy() + + o := &TCPOutlet{ + formatter: formatter, + connect: connect, + entryChan: entryChan, + } + + go o.outLoop(retryInterval) + + return o +} + +// FIXME: use this method +func (h *TCPOutlet) Close() { + close(h.entryChan) +} + +func (h *TCPOutlet) outLoop(retryInterval time.Duration) { + + var retry time.Time + var conn net.Conn + for msg := range h.entryChan { + var err error + for conn == nil { + time.Sleep(time.Until(retry)) + ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(retryInterval)) + conn, err = h.connect(ctx) + cancel() + if err != nil { + retry = time.Now().Add(retryInterval) + conn = nil + } + } + conn.SetWriteDeadline(time.Now().Add(retryInterval)) + _, err = io.Copy(conn, msg) + if err != nil { + retry = time.Now().Add(retryInterval) + conn.Close() + conn = nil + } + } +} + +func (h *TCPOutlet) WriteEntry(e logger.Entry) error { + + ebytes, err := h.formatter.Format(&e) + if err != nil { + return err + } + + buf := new(bytes.Buffer) + buf.Write(ebytes) + buf.WriteString("\n") + + select { + case h.entryChan <- buf: + return nil + default: + return errors.New("connection broken or not fast enough") + } +} + +type SyslogOutlet struct { + Formatter EntryFormatter + RetryInterval time.Duration + writer *syslog.Writer + lastConnectAttempt time.Time +} + +func (o *SyslogOutlet) WriteEntry(entry logger.Entry) error { + + bytes, err := o.Formatter.Format(&entry) + if err != nil { + return err + } + + s := string(bytes) + + if o.writer == nil { + now := time.Now() + if now.Sub(o.lastConnectAttempt) < o.RetryInterval { + return nil // not an error toward logger + } + o.writer, err = syslog.New(syslog.LOG_LOCAL0, "zrepl") + o.lastConnectAttempt = time.Now() + if err != nil { + o.writer = nil + return err + } + } + + switch entry.Level { + case logger.Debug: + return o.writer.Debug(s) + case logger.Info: + return o.writer.Info(s) + case logger.Warn: + return o.writer.Warning(s) + case logger.Error: + return o.writer.Err(s) + default: + return o.writer.Err(s) // write as error as reaching this case is in fact an error + } + +} diff --git a/daemon/main.go b/daemon/main.go new file mode 100644 index 0000000..488b020 --- /dev/null +++ b/daemon/main.go @@ -0,0 +1,8 @@ +package daemon + +import ( + "github.com/zrepl/zrepl/logger" +) + +type Logger = logger.Logger + diff --git a/daemon/pprof.go b/daemon/pprof.go new file mode 100644 index 0000000..2296ebd --- /dev/null +++ b/daemon/pprof.go @@ -0,0 +1,80 @@ +package daemon + +import ( + "net/http" + // FIXME: importing this package has the side-effect of poisoning the http.DefaultServeMux + // FIXME: with the /debug/pprof endpoints + "context" + "net" + "net/http/pprof" +) + +type PProfServer struct { + cc chan PprofServerControlMsg + state PprofServerControlMsg + listener net.Listener +} + +type PprofServerControlMsg struct { + // Whether the server should listen for requests on the given address + Run bool + // Must be set if Run is true, undefined otherwise + HttpListenAddress string +} + +func NewPProfServer(ctx context.Context) *PProfServer { + + s := &PProfServer{ + cc: make(chan PprofServerControlMsg), + } + + go s.controlLoop(ctx) + return s +} + +func (s *PProfServer) controlLoop(ctx context.Context) { +outer: + for { + + var msg PprofServerControlMsg + select { + case <-ctx.Done(): + if s.listener != nil { + s.listener.Close() + } + break outer + case msg = <-s.cc: + // proceed + } + + var err error + if msg.Run && s.listener == nil { + + s.listener, err = net.Listen("tcp", msg.HttpListenAddress) + if err != nil { + s.listener = nil + continue + } + + // FIXME: because net/http/pprof does not provide a mux, + mux := http.NewServeMux() + mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + go http.Serve(s.listener, mux) + continue + } + + if !msg.Run && s.listener != nil { + s.listener.Close() + s.listener = nil + continue + } + } +} + +func (s *PProfServer) Control(msg PprofServerControlMsg) { + s.cc <- msg +} diff --git a/daemon/prometheus.go b/daemon/prometheus.go new file mode 100644 index 0000000..1cef3d0 --- /dev/null +++ b/daemon/prometheus.go @@ -0,0 +1,82 @@ +package daemon + +import ( + "context" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/zrepl/zrepl/cmd/daemon/job" + "github.com/zrepl/zrepl/zfs" + "net" + "net/http" +) + +type prometheusJob struct { + listen string +} + +func newPrometheusJob(listen string) *prometheusJob { + return &prometheusJob{listen} +} + +var prom struct { + taskLastActiveStart *prometheus.GaugeVec + taskLastActiveDuration *prometheus.GaugeVec + taskLogEntries *prometheus.CounterVec +} + +func init() { + prom.taskLastActiveStart = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "zrepl", + Subsystem: "daemon", + Name: "task_last_active_start", + Help: "point in time at which the job task last left idle state", + }, []string{"zrepl_job", "job_type", "task"}) + prom.taskLastActiveDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "zrepl", + Subsystem: "daemon", + Name: "task_last_active_duration", + Help: "seconds that the last run ob a job task spent between leaving and re-entering idle state", + }, []string{"zrepl_job", "job_type", "task"}) + prom.taskLogEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "zrepl", + Subsystem: "daemon", + Name: "task_log_entries", + Help: "number of log entries per job task and level", + }, []string{"zrepl_job", "job_type", "task", "level"}) + prometheus.MustRegister(prom.taskLastActiveStart) + prometheus.MustRegister(prom.taskLastActiveDuration) + prometheus.MustRegister(prom.taskLogEntries) +} + +func (j *prometheusJob) Name() string { return jobNamePrometheus } + +func (j *prometheusJob) Status() interface{} { return nil } + +func (j *prometheusJob) Run(ctx context.Context) { + + if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil { + panic(err) + } + + log := job.GetLogger(ctx) + + l, err := net.Listen("tcp", j.listen) + if err != nil { + log.WithError(err).Error("cannot listen") + } + go func() { + select { + case <-ctx.Done(): + l.Close() + } + }() + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + err = http.Serve(l, mux) + if err != nil { + log.WithError(err).Error("error while serving") + } + +} diff --git a/main.go b/main.go index a7919b6..452b02d 100644 --- a/main.go +++ b/main.go @@ -2,13 +2,51 @@ package main import ( - "github.com/zrepl/zrepl/cmd" "log" "os" + "github.com/spf13/cobra" + "github.com/zrepl/zrepl/daemon" + "github.com/zrepl/zrepl/cmd/config" ) +var rootCmd = &cobra.Command{ + Use: "zrepl", + Short: "ZFS dataset replication", + Long: `Replicate ZFS filesystems & volumes between pools: + + - push & pull mode + - automatic snapshot creation & pruning + - local / over the network + - ACLs instead of blank SSH access`, +} + + +var daemonCmd = &cobra.Command{ + Use: "daemon", + Short: "daemon", + RunE: func(cmd *cobra.Command, args []string) error { + conf, err := config.ParseConfig(rootArgs.configFile) + if err != nil { + return err + } + return daemon.Run(conf) + }, +} + +var rootArgs struct { + configFile string +} + +func init() { + //cobra.OnInitialize(initConfig) + rootCmd.PersistentFlags().StringVar(&rootArgs.configFile, "config", "", "config file path") + rootCmd.AddCommand(daemonCmd) +} + func main() { - if err := cmd.RootCmd.Execute(); err != nil { + + + if err := rootCmd.Execute(); err != nil { log.Printf("error executing root command: %s", err) os.Exit(1) }