From 6425c26b1b5f9d8d5751bc547304b5dc601b9c74 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 26 Aug 2018 21:58:58 +0200 Subject: [PATCH] start refactoring: move daemon into subpackage --- cmd/config_parse.go | 9 - cmd/config_serve_stdinserver.go | 3 +- cmd/control.go | 8 +- cmd/daemon.go | 28 ++- .../control.go} | 37 ++-- cmd/daemon/daemon.go | 162 ++++++++++++++++++ cmd/daemon/job/job.go | 47 +++++ cmd/{control_pprof.go => daemon/pprof.go} | 2 +- .../prometheus.go} | 37 ++-- cmd/{ => helpers}/helpers.go | 6 +- 10 files changed, 271 insertions(+), 68 deletions(-) rename cmd/{config_job_control.go => daemon/control.go} (78%) create mode 100644 cmd/daemon/daemon.go create mode 100644 cmd/daemon/job/job.go rename cmd/{control_pprof.go => daemon/pprof.go} (99%) rename cmd/{config_job_prometheus.go => daemon/prometheus.go} (69%) rename cmd/{ => helpers}/helpers.go (98%) diff --git a/cmd/config_parse.go b/cmd/config_parse.go index 3a5b1eb..36a2406 100644 --- a/cmd/config_parse.go +++ b/cmd/config_parse.go @@ -137,13 +137,6 @@ func parseConfig(i interface{}) (c *Config, err error) { c.Jobs[job.JobName()] = job } - cj, err := NewControlJob(JobNameControl, jpc.Global.Control.Sockpath) - if err != nil { - err = errors.Wrap(err, "cannot create control job") - return - } - c.Jobs[JobNameControl] = cj - return c, nil } @@ -201,8 +194,6 @@ func parseJob(c JobParsingContext, i map[string]interface{}) (j Job, err error) return parseSourceJob(c, name, i) case JobTypeLocal: return parseLocalJob(c, name, i) - case JobTypePrometheus: - return parsePrometheusJob(c, name, i) default: panic(fmt.Sprintf("implementation error: unknown job type %s", jobtype)) } diff --git a/cmd/config_serve_stdinserver.go b/cmd/config_serve_stdinserver.go index 2380cc8..4dd8e86 100644 --- a/cmd/config_serve_stdinserver.go +++ b/cmd/config_serve_stdinserver.go @@ -6,6 +6,7 @@ import ( "github.com/problame/go-netssh" "net" "path" + "github.com/zrepl/zrepl/cmd/helpers" ) type StdinserverListenerFactory struct { @@ -32,7 +33,7 @@ func parseStdinserverListenerFactory(c JobParsingContext, i map[string]interface func (f *StdinserverListenerFactory) Listen() (net.Listener, error) { - if err := PreparePrivateSockpath(f.sockpath); err != nil { + if err := helpers.PreparePrivateSockpath(f.sockpath); err != nil { return nil, err } diff --git a/cmd/control.go b/cmd/control.go index 2cec607..fee5349 100644 --- a/cmd/control.go +++ b/cmd/control.go @@ -13,6 +13,8 @@ import ( "net" "net/http" "os" + "github.com/zrepl/zrepl/version" + "github.com/zrepl/zrepl/cmd/daemon" ) var controlCmd = &cobra.Command{ @@ -48,7 +50,7 @@ var pprofCmd = &cobra.Command{ }, } var pprofCmdArgs struct { - msg PprofServerControlMsg + msg daemon.PprofServerControlMsg } var controlVersionCmd = &cobra.Command{ @@ -107,7 +109,7 @@ func doControlPProf(cmd *cobra.Command, args []string) { log.Printf("error marshaling request: %s", err) die() } - _, err = httpc.Post("http://unix"+ControlJobEndpointPProf, "application/json", &buf) + _, err = httpc.Post("http://unix"+daemon.ControlJobEndpointPProf, "application/json", &buf) if err != nil { log.Printf("error: %s", err) die() @@ -131,7 +133,7 @@ func doControLVersionCmd(cmd *cobra.Command, args []string) { die() } - resp, err := httpc.Get("http://unix" + ControlJobEndpointVersion) + resp, err := httpc.Get("http://unix" + daemon.ControlJobEndpointVersion) if err != nil { log.Printf("error: %s", err) die() diff --git a/cmd/daemon.go b/cmd/daemon.go index 6593114..94a32d4 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -4,12 +4,14 @@ import ( "context" "fmt" "github.com/spf13/cobra" - "github.com/zrepl/zrepl/logger" "os" "os/signal" "syscall" "time" + "github.com/zrepl/zrepl/cmd/daemon" + "github.com/zrepl/zrepl/cmd/daemon/job" + "github.com/zrepl/zrepl/logger" ) // daemonCmd represents the daemon command @@ -57,6 +59,20 @@ func (j JobType) String() string { return string(j) } +type daemonJobAdaptor struct { + j Job +} + +func (a daemonJobAdaptor) Name() string { + return a.j.JobName() +} + +func (a daemonJobAdaptor) Run(ctx context.Context) { + a.j.JobStart(ctx) +} + +func (a daemonJobAdaptor) Status() interface{} { return nil } + func doDaemon(cmd *cobra.Command, args []string) { conf, err := ParseConfig(rootArgs.configFile) @@ -66,13 +82,13 @@ func doDaemon(cmd *cobra.Command, args []string) { } log := logger.NewLogger(conf.Global.logging.Outlets, 1*time.Second) - - log.Info(NewZreplVersionInformation().String()) - log.Debug("starting daemon") ctx := WithLogger(context.Background(), log) - d := NewDaemon(conf) - d.Loop(ctx) + daemonJobs := make([]job.Job, 0, len(conf.Jobs)) + for i := range conf.Jobs { + daemonJobs = append(daemonJobs, daemonJobAdaptor{conf.Jobs[i]}) + } + daemon.Run(ctx, conf.Global.Control.Sockpath, conf.Global.logging.Outlets, daemonJobs) } diff --git a/cmd/config_job_control.go b/cmd/daemon/control.go similarity index 78% rename from cmd/config_job_control.go rename to cmd/daemon/control.go index 3c0d802..c7d17a1 100644 --- a/cmd/config_job_control.go +++ b/cmd/daemon/control.go @@ -1,4 +1,4 @@ -package cmd +package daemon import ( "bytes" @@ -9,15 +9,18 @@ import ( "io" "net" "net/http" + "github.com/zrepl/zrepl/cmd/daemon/job" + "github.com/zrepl/zrepl/version" + "github.com/zrepl/zrepl/cmd/helpers" ) -type ControlJob struct { - Name string +type controlJob struct { sockaddr *net.UnixAddr + jobs *jobs } -func NewControlJob(name, sockpath string) (j *ControlJob, err error) { - j = &ControlJob{Name: name} +func newControlJob(sockpath string, jobs *jobs) (j *controlJob, err error) { + j = &controlJob{jobs: jobs} j.sockaddr, err = net.ResolveUnixAddr("unix", sockpath) if err != nil { @@ -28,11 +31,9 @@ func NewControlJob(name, sockpath string) (j *ControlJob, err error) { return } -func (j *ControlJob) JobName() string { - return j.Name -} +func (j *controlJob) Name() string { return jobNameControl } -func (j *ControlJob) JobType() JobType { return JobTypeControl } +func (j *controlJob) Status() interface{} { return nil } const ( ControlJobEndpointPProf string = "/debug/pprof" @@ -40,14 +41,12 @@ const ( ControlJobEndpointStatus string = "/status" ) -func (j *ControlJob) JobStart(ctx context.Context) { +func (j *controlJob) Run(ctx context.Context) { - log := getLogger(ctx) + log := job.GetLogger(ctx) defer log.Info("control job finished") - daemon := ctx.Value(contextKeyDaemon).(*Daemon) - - l, err := ListenUnixPrivate(j.sockaddr) + l, err := helpers.ListenUnixPrivate(j.sockaddr) if err != nil { log.WithError(err).Error("error listening") return @@ -68,16 +67,12 @@ func (j *ControlJob) JobStart(ctx context.Context) { }}) mux.Handle(ControlJobEndpointVersion, requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { - return NewZreplVersionInformation(), nil + return version.NewZreplVersionInformation(), nil }}}) mux.Handle(ControlJobEndpointStatus, requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { - panic("FIXME") // FIXME - }}}) - mux.Handle("/pulljobreport", - requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { - j := daemon.conf.Jobs["debian"] - return j.(*PullJob).Report(), nil + s := j.jobs.status() + return s, nil }}}) server := http.Server{Handler: mux} diff --git a/cmd/daemon/daemon.go b/cmd/daemon/daemon.go new file mode 100644 index 0000000..6240d01 --- /dev/null +++ b/cmd/daemon/daemon.go @@ -0,0 +1,162 @@ +package daemon + +import ( + "context" + "os" + "os/signal" + "syscall" + "sync" + "fmt" + "github.com/zrepl/zrepl/cmd/daemon/job" + "strings" + "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/version" + "time" +) + + +func Run(ctx context.Context, controlSockpath string, outlets *logger.Outlets, confJobs []job.Job) { + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigChan + cancel() + }() + + log := logger.NewLogger(outlets, 1*time.Second) + log.Info(version.NewZreplVersionInformation().String()) + + // parse config + for _, job := range confJobs { + if IsInternalJobName(job.Name()) { + panic(fmt.Sprintf("internal job name used for config job '%s'", job.Name())) //FIXME + } + } + + ctx = job.WithLogger(ctx, log) + + jobs := newJobs() + + // start control socket + controlJob, err := newControlJob(controlSockpath, jobs) + if err != nil { + panic(err) // FIXME + } + jobs.start(ctx, controlJob, true) + + // start prometheus + //var promJob *prometheusJob // FIXME + //jobs.start(ctx, promJob, true) + + log.Info("starting daemon") + + // start regular jobs + for _, j := range confJobs { + jobs.start(ctx, j, false) + } + + select { + case <-jobs.wait(): + log.Info("all jobs finished") + case <-ctx.Done(): + log.WithError(ctx.Err()).Info("context finished") + } + log.Info("daemon exiting") +} + +type jobs struct { + wg sync.WaitGroup + + // m protects all fields below it + m sync.RWMutex + wakeups map[string]job.WakeupChan // by JobName + jobs map[string]job.Job +} + +func newJobs() *jobs { + return &jobs{ + wakeups: make(map[string]job.WakeupChan), + jobs: make(map[string]job.Job), + } +} + +const ( + logJobField string = "job" + logTaskField string = "task" + logSubsysField string = "subsystem" +) + +func (s *jobs) wait() <-chan struct{} { + ch := make(chan struct{}) + go func() { + s.wg.Wait() + }() + return ch +} + +func (s *jobs) status() map[string]interface{} { + s.m.RLock() + defer s.m.RUnlock() + + type res struct { + name string + status interface{} + } + var wg sync.WaitGroup + c := make(chan res, len(s.jobs)) + for name, j := range s.jobs { + wg.Add(1) + go func(name string, j job.Job) { + defer wg.Done() + c <- res{name: name, status: j.Status()} + }(name, j) + } + wg.Wait() + close(c) + ret := make(map[string]interface{}, len(s.jobs)) + for res := range c { + ret[res.name] = res.status + } + return ret +} + +const ( + jobNamePrometheus = "_prometheus" + jobNameControl = "_control" +) + +func IsInternalJobName(s string) bool { + return strings.HasPrefix(s, "_") +} + +func (s *jobs) start(ctx context.Context, j job.Job, internal bool) { + s.m.Lock() + defer s.m.Unlock() + + jobLog := job.GetLogger(ctx).WithField(logJobField, j.Name()) + jobName := j.Name() + if !internal && IsInternalJobName(jobName) { + panic(fmt.Sprintf("internal job name used for non-internal job %s", jobName)) + } + if internal && !IsInternalJobName(jobName) { + panic(fmt.Sprintf("internal job does not use internal job name %s", jobName)) + } + if _, ok := s.jobs[jobName]; ok { + panic(fmt.Sprintf("duplicate job name %s", jobName)) + } + s.jobs[jobName] = j + ctx = job.WithLogger(ctx, jobLog) + ctx, wakeupChan := job.WithWakeup(ctx) + s.wakeups[jobName] = wakeupChan + + s.wg.Add(1) + go func() { + defer s.wg.Done() + jobLog.Info("starting job") + defer jobLog.Info("job exited") + j.Run(ctx) + }() +} diff --git a/cmd/daemon/job/job.go b/cmd/daemon/job/job.go new file mode 100644 index 0000000..59cc147 --- /dev/null +++ b/cmd/daemon/job/job.go @@ -0,0 +1,47 @@ +package job + +import ( + "github.com/zrepl/zrepl/logger" + "context" +) + +type Logger = logger.Logger + +type contextKey int + +const ( + contextKeyLog contextKey = iota + contextKeyWakeup +) + +func GetLogger(ctx context.Context) Logger { + if l, ok := ctx.Value(contextKeyLog).(Logger); ok { + return l + } + return logger.NewNullLogger() +} + +func WithLogger(ctx context.Context, l Logger) context.Context { + return context.WithValue(ctx, contextKeyLog, l) +} + +func WithWakeup(ctx context.Context) (context.Context, WakeupChan) { + wc := make(chan struct{}, 1) + return context.WithValue(ctx, contextKeyWakeup, wc), wc +} + +type Job interface { + Name() string + Run(ctx context.Context) + Status() interface{} +} + +type WakeupChan <-chan struct{} + +func WaitWakeup(ctx context.Context) WakeupChan { + wc, ok := ctx.Value(contextKeyWakeup).(WakeupChan) + if !ok { + wc = make(chan struct{}) + } + return wc +} diff --git a/cmd/control_pprof.go b/cmd/daemon/pprof.go similarity index 99% rename from cmd/control_pprof.go rename to cmd/daemon/pprof.go index a6b6939..2296ebd 100644 --- a/cmd/control_pprof.go +++ b/cmd/daemon/pprof.go @@ -1,4 +1,4 @@ -package cmd +package daemon import ( "net/http" diff --git a/cmd/config_job_prometheus.go b/cmd/daemon/prometheus.go similarity index 69% rename from cmd/config_job_prometheus.go rename to cmd/daemon/prometheus.go index 62400f2..afe4fd5 100644 --- a/cmd/config_job_prometheus.go +++ b/cmd/daemon/prometheus.go @@ -1,19 +1,21 @@ -package cmd +package daemon import ( "context" - "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/zrepl/zrepl/zfs" "net" "net/http" + "github.com/zrepl/zrepl/cmd/daemon/job" ) -type PrometheusJob struct { - Name string - Listen string +type prometheusJob struct { + listen string +} + +func newPrometheusJob(listen string) *prometheusJob { + return &prometheusJob{listen} } var prom struct { @@ -46,32 +48,19 @@ func init() { prometheus.MustRegister(prom.taskLogEntries) } -func parsePrometheusJob(c JobParsingContext, name string, i map[string]interface{}) (j *PrometheusJob, err error) { - var s struct { - Listen string - } - if err := mapstructure.Decode(i, &s); err != nil { - return nil, errors.Wrap(err, "mapstructure error") - } - if s.Listen == "" { - return nil, errors.New("must specify 'listen' attribute") - } - return &PrometheusJob{name, s.Listen}, nil -} +func (j *prometheusJob) Name() string { return jobNamePrometheus } -func (j *PrometheusJob) JobName() string { return j.Name } +func (j *prometheusJob) Status() interface{} { return nil } -func (j *PrometheusJob) JobType() JobType { return JobTypePrometheus } - -func (j *PrometheusJob) JobStart(ctx context.Context) { +func (j *prometheusJob) Run(ctx context.Context) { if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil { panic(err) } - log := getLogger(ctx) + log := job.GetLogger(ctx) - l, err := net.Listen("tcp", j.Listen) + l, err := net.Listen("tcp", j.listen) if err != nil { log.WithError(err).Error("cannot listen") } diff --git a/cmd/helpers.go b/cmd/helpers/helpers.go similarity index 98% rename from cmd/helpers.go rename to cmd/helpers/helpers.go index be3864e..01d6a9c 100644 --- a/cmd/helpers.go +++ b/cmd/helpers/helpers.go @@ -1,10 +1,10 @@ -package cmd +package helpers import ( + "path/filepath" + "os" "github.com/pkg/errors" "net" - "os" - "path/filepath" ) func PreparePrivateSockpath(sockpath string) error {