start refactoring: move daemon into subpackage

This commit is contained in:
Christian Schwarz 2018-08-26 21:58:58 +02:00
parent 428339e1ad
commit 6425c26b1b
10 changed files with 271 additions and 68 deletions

View File

@ -137,13 +137,6 @@ func parseConfig(i interface{}) (c *Config, err error) {
c.Jobs[job.JobName()] = job c.Jobs[job.JobName()] = job
} }
cj, err := NewControlJob(JobNameControl, jpc.Global.Control.Sockpath)
if err != nil {
err = errors.Wrap(err, "cannot create control job")
return
}
c.Jobs[JobNameControl] = cj
return c, nil return c, nil
} }
@ -201,8 +194,6 @@ func parseJob(c JobParsingContext, i map[string]interface{}) (j Job, err error)
return parseSourceJob(c, name, i) return parseSourceJob(c, name, i)
case JobTypeLocal: case JobTypeLocal:
return parseLocalJob(c, name, i) return parseLocalJob(c, name, i)
case JobTypePrometheus:
return parsePrometheusJob(c, name, i)
default: default:
panic(fmt.Sprintf("implementation error: unknown job type %s", jobtype)) panic(fmt.Sprintf("implementation error: unknown job type %s", jobtype))
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/problame/go-netssh" "github.com/problame/go-netssh"
"net" "net"
"path" "path"
"github.com/zrepl/zrepl/cmd/helpers"
) )
type StdinserverListenerFactory struct { type StdinserverListenerFactory struct {
@ -32,7 +33,7 @@ func parseStdinserverListenerFactory(c JobParsingContext, i map[string]interface
func (f *StdinserverListenerFactory) Listen() (net.Listener, error) { func (f *StdinserverListenerFactory) Listen() (net.Listener, error) {
if err := PreparePrivateSockpath(f.sockpath); err != nil { if err := helpers.PreparePrivateSockpath(f.sockpath); err != nil {
return nil, err return nil, err
} }

View File

@ -13,6 +13,8 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"github.com/zrepl/zrepl/version"
"github.com/zrepl/zrepl/cmd/daemon"
) )
var controlCmd = &cobra.Command{ var controlCmd = &cobra.Command{
@ -48,7 +50,7 @@ var pprofCmd = &cobra.Command{
}, },
} }
var pprofCmdArgs struct { var pprofCmdArgs struct {
msg PprofServerControlMsg msg daemon.PprofServerControlMsg
} }
var controlVersionCmd = &cobra.Command{ var controlVersionCmd = &cobra.Command{
@ -107,7 +109,7 @@ func doControlPProf(cmd *cobra.Command, args []string) {
log.Printf("error marshaling request: %s", err) log.Printf("error marshaling request: %s", err)
die() die()
} }
_, err = httpc.Post("http://unix"+ControlJobEndpointPProf, "application/json", &buf) _, err = httpc.Post("http://unix"+daemon.ControlJobEndpointPProf, "application/json", &buf)
if err != nil { if err != nil {
log.Printf("error: %s", err) log.Printf("error: %s", err)
die() die()
@ -131,7 +133,7 @@ func doControLVersionCmd(cmd *cobra.Command, args []string) {
die() die()
} }
resp, err := httpc.Get("http://unix" + ControlJobEndpointVersion) resp, err := httpc.Get("http://unix" + daemon.ControlJobEndpointVersion)
if err != nil { if err != nil {
log.Printf("error: %s", err) log.Printf("error: %s", err)
die() die()

View File

@ -4,12 +4,14 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/zrepl/zrepl/logger"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time" "time"
"github.com/zrepl/zrepl/cmd/daemon"
"github.com/zrepl/zrepl/cmd/daemon/job"
"github.com/zrepl/zrepl/logger"
) )
// daemonCmd represents the daemon command // daemonCmd represents the daemon command
@ -57,6 +59,20 @@ func (j JobType) String() string {
return string(j) return string(j)
} }
type daemonJobAdaptor struct {
j Job
}
func (a daemonJobAdaptor) Name() string {
return a.j.JobName()
}
func (a daemonJobAdaptor) Run(ctx context.Context) {
a.j.JobStart(ctx)
}
func (a daemonJobAdaptor) Status() interface{} { return nil }
func doDaemon(cmd *cobra.Command, args []string) { func doDaemon(cmd *cobra.Command, args []string) {
conf, err := ParseConfig(rootArgs.configFile) conf, err := ParseConfig(rootArgs.configFile)
@ -66,13 +82,13 @@ func doDaemon(cmd *cobra.Command, args []string) {
} }
log := logger.NewLogger(conf.Global.logging.Outlets, 1*time.Second) log := logger.NewLogger(conf.Global.logging.Outlets, 1*time.Second)
log.Info(NewZreplVersionInformation().String())
log.Debug("starting daemon")
ctx := WithLogger(context.Background(), log) ctx := WithLogger(context.Background(), log)
d := NewDaemon(conf) daemonJobs := make([]job.Job, 0, len(conf.Jobs))
d.Loop(ctx) for i := range conf.Jobs {
daemonJobs = append(daemonJobs, daemonJobAdaptor{conf.Jobs[i]})
}
daemon.Run(ctx, conf.Global.Control.Sockpath, conf.Global.logging.Outlets, daemonJobs)
} }

View File

@ -1,4 +1,4 @@
package cmd package daemon
import ( import (
"bytes" "bytes"
@ -9,15 +9,18 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"github.com/zrepl/zrepl/cmd/daemon/job"
"github.com/zrepl/zrepl/version"
"github.com/zrepl/zrepl/cmd/helpers"
) )
type ControlJob struct { type controlJob struct {
Name string
sockaddr *net.UnixAddr sockaddr *net.UnixAddr
jobs *jobs
} }
func NewControlJob(name, sockpath string) (j *ControlJob, err error) { func newControlJob(sockpath string, jobs *jobs) (j *controlJob, err error) {
j = &ControlJob{Name: name} j = &controlJob{jobs: jobs}
j.sockaddr, err = net.ResolveUnixAddr("unix", sockpath) j.sockaddr, err = net.ResolveUnixAddr("unix", sockpath)
if err != nil { if err != nil {
@ -28,11 +31,9 @@ func NewControlJob(name, sockpath string) (j *ControlJob, err error) {
return return
} }
func (j *ControlJob) JobName() string { func (j *controlJob) Name() string { return jobNameControl }
return j.Name
}
func (j *ControlJob) JobType() JobType { return JobTypeControl } func (j *controlJob) Status() interface{} { return nil }
const ( const (
ControlJobEndpointPProf string = "/debug/pprof" ControlJobEndpointPProf string = "/debug/pprof"
@ -40,14 +41,12 @@ const (
ControlJobEndpointStatus string = "/status" ControlJobEndpointStatus string = "/status"
) )
func (j *ControlJob) JobStart(ctx context.Context) { func (j *controlJob) Run(ctx context.Context) {
log := getLogger(ctx) log := job.GetLogger(ctx)
defer log.Info("control job finished") defer log.Info("control job finished")
daemon := ctx.Value(contextKeyDaemon).(*Daemon) l, err := helpers.ListenUnixPrivate(j.sockaddr)
l, err := ListenUnixPrivate(j.sockaddr)
if err != nil { if err != nil {
log.WithError(err).Error("error listening") log.WithError(err).Error("error listening")
return return
@ -68,16 +67,12 @@ func (j *ControlJob) JobStart(ctx context.Context) {
}}) }})
mux.Handle(ControlJobEndpointVersion, mux.Handle(ControlJobEndpointVersion,
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
return NewZreplVersionInformation(), nil return version.NewZreplVersionInformation(), nil
}}}) }}})
mux.Handle(ControlJobEndpointStatus, mux.Handle(ControlJobEndpointStatus,
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
panic("FIXME") // FIXME s := j.jobs.status()
}}}) return s, nil
mux.Handle("/pulljobreport",
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
j := daemon.conf.Jobs["debian"]
return j.(*PullJob).Report(), nil
}}}) }}})
server := http.Server{Handler: mux} server := http.Server{Handler: mux}

162
cmd/daemon/daemon.go Normal file
View File

@ -0,0 +1,162 @@
package daemon
import (
"context"
"os"
"os/signal"
"syscall"
"sync"
"fmt"
"github.com/zrepl/zrepl/cmd/daemon/job"
"strings"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version"
"time"
)
func Run(ctx context.Context, controlSockpath string, outlets *logger.Outlets, confJobs []job.Job) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()
log := logger.NewLogger(outlets, 1*time.Second)
log.Info(version.NewZreplVersionInformation().String())
// parse config
for _, job := range confJobs {
if IsInternalJobName(job.Name()) {
panic(fmt.Sprintf("internal job name used for config job '%s'", job.Name())) //FIXME
}
}
ctx = job.WithLogger(ctx, log)
jobs := newJobs()
// start control socket
controlJob, err := newControlJob(controlSockpath, jobs)
if err != nil {
panic(err) // FIXME
}
jobs.start(ctx, controlJob, true)
// start prometheus
//var promJob *prometheusJob // FIXME
//jobs.start(ctx, promJob, true)
log.Info("starting daemon")
// start regular jobs
for _, j := range confJobs {
jobs.start(ctx, j, false)
}
select {
case <-jobs.wait():
log.Info("all jobs finished")
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context finished")
}
log.Info("daemon exiting")
}
type jobs struct {
wg sync.WaitGroup
// m protects all fields below it
m sync.RWMutex
wakeups map[string]job.WakeupChan // by JobName
jobs map[string]job.Job
}
func newJobs() *jobs {
return &jobs{
wakeups: make(map[string]job.WakeupChan),
jobs: make(map[string]job.Job),
}
}
const (
logJobField string = "job"
logTaskField string = "task"
logSubsysField string = "subsystem"
)
func (s *jobs) wait() <-chan struct{} {
ch := make(chan struct{})
go func() {
s.wg.Wait()
}()
return ch
}
func (s *jobs) status() map[string]interface{} {
s.m.RLock()
defer s.m.RUnlock()
type res struct {
name string
status interface{}
}
var wg sync.WaitGroup
c := make(chan res, len(s.jobs))
for name, j := range s.jobs {
wg.Add(1)
go func(name string, j job.Job) {
defer wg.Done()
c <- res{name: name, status: j.Status()}
}(name, j)
}
wg.Wait()
close(c)
ret := make(map[string]interface{}, len(s.jobs))
for res := range c {
ret[res.name] = res.status
}
return ret
}
const (
jobNamePrometheus = "_prometheus"
jobNameControl = "_control"
)
func IsInternalJobName(s string) bool {
return strings.HasPrefix(s, "_")
}
func (s *jobs) start(ctx context.Context, j job.Job, internal bool) {
s.m.Lock()
defer s.m.Unlock()
jobLog := job.GetLogger(ctx).WithField(logJobField, j.Name())
jobName := j.Name()
if !internal && IsInternalJobName(jobName) {
panic(fmt.Sprintf("internal job name used for non-internal job %s", jobName))
}
if internal && !IsInternalJobName(jobName) {
panic(fmt.Sprintf("internal job does not use internal job name %s", jobName))
}
if _, ok := s.jobs[jobName]; ok {
panic(fmt.Sprintf("duplicate job name %s", jobName))
}
s.jobs[jobName] = j
ctx = job.WithLogger(ctx, jobLog)
ctx, wakeupChan := job.WithWakeup(ctx)
s.wakeups[jobName] = wakeupChan
s.wg.Add(1)
go func() {
defer s.wg.Done()
jobLog.Info("starting job")
defer jobLog.Info("job exited")
j.Run(ctx)
}()
}

47
cmd/daemon/job/job.go Normal file
View File

@ -0,0 +1,47 @@
package job
import (
"github.com/zrepl/zrepl/logger"
"context"
)
type Logger = logger.Logger
type contextKey int
const (
contextKeyLog contextKey = iota
contextKeyWakeup
)
func GetLogger(ctx context.Context) Logger {
if l, ok := ctx.Value(contextKeyLog).(Logger); ok {
return l
}
return logger.NewNullLogger()
}
func WithLogger(ctx context.Context, l Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, l)
}
func WithWakeup(ctx context.Context) (context.Context, WakeupChan) {
wc := make(chan struct{}, 1)
return context.WithValue(ctx, contextKeyWakeup, wc), wc
}
type Job interface {
Name() string
Run(ctx context.Context)
Status() interface{}
}
type WakeupChan <-chan struct{}
func WaitWakeup(ctx context.Context) WakeupChan {
wc, ok := ctx.Value(contextKeyWakeup).(WakeupChan)
if !ok {
wc = make(chan struct{})
}
return wc
}

View File

@ -1,4 +1,4 @@
package cmd package daemon
import ( import (
"net/http" "net/http"

View File

@ -1,19 +1,21 @@
package cmd package daemon
import ( import (
"context" "context"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/zrepl/zrepl/zfs" "github.com/zrepl/zrepl/zfs"
"net" "net"
"net/http" "net/http"
"github.com/zrepl/zrepl/cmd/daemon/job"
) )
type PrometheusJob struct { type prometheusJob struct {
Name string listen string
Listen string }
func newPrometheusJob(listen string) *prometheusJob {
return &prometheusJob{listen}
} }
var prom struct { var prom struct {
@ -46,32 +48,19 @@ func init() {
prometheus.MustRegister(prom.taskLogEntries) prometheus.MustRegister(prom.taskLogEntries)
} }
func parsePrometheusJob(c JobParsingContext, name string, i map[string]interface{}) (j *PrometheusJob, err error) { func (j *prometheusJob) Name() string { return jobNamePrometheus }
var s struct {
Listen string
}
if err := mapstructure.Decode(i, &s); err != nil {
return nil, errors.Wrap(err, "mapstructure error")
}
if s.Listen == "" {
return nil, errors.New("must specify 'listen' attribute")
}
return &PrometheusJob{name, s.Listen}, nil
}
func (j *PrometheusJob) JobName() string { return j.Name } func (j *prometheusJob) Status() interface{} { return nil }
func (j *PrometheusJob) JobType() JobType { return JobTypePrometheus } func (j *prometheusJob) Run(ctx context.Context) {
func (j *PrometheusJob) JobStart(ctx context.Context) {
if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil { if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil {
panic(err) panic(err)
} }
log := getLogger(ctx) log := job.GetLogger(ctx)
l, err := net.Listen("tcp", j.Listen) l, err := net.Listen("tcp", j.listen)
if err != nil { if err != nil {
log.WithError(err).Error("cannot listen") log.WithError(err).Error("cannot listen")
} }

View File

@ -1,10 +1,10 @@
package cmd package helpers
import ( import (
"path/filepath"
"os"
"github.com/pkg/errors" "github.com/pkg/errors"
"net" "net"
"os"
"path/filepath"
) )
func PreparePrivateSockpath(sockpath string) error { func PreparePrivateSockpath(sockpath string) error {