zrepl/cmd/daemon.go
2018-08-26 22:06:47 +02:00

176 lines
3.4 KiB
Go

package cmd
import (
"context"
"fmt"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/logger"
"os"
"os/signal"
"syscall"
"time"
"github.com/zrepl/zrepl/cmd/daemon"
"github.com/zrepl/zrepl/cmd/daemon/job"
)
// daemonCmd represents the daemon command
var daemonCmd = &cobra.Command{
Use: "daemon",
Short: "start daemon",
Run: doDaemon,
}
func init() {
RootCmd.AddCommand(daemonCmd)
}
type Job interface {
JobName() string
JobType() JobType
JobStart(ctxt context.Context)
}
type JobType string
const (
JobTypePull JobType = "pull"
JobTypeSource JobType = "source"
JobTypeLocal JobType = "local"
JobTypePrometheus JobType = "prometheus"
JobTypeControl JobType = "control"
)
func ParseUserJobType(s string) (JobType, error) {
switch s {
case "pull":
return JobTypePull, nil
case "source":
return JobTypeSource, nil
case "local":
return JobTypeLocal, nil
case "prometheus":
return JobTypePrometheus, nil
}
return "", fmt.Errorf("unknown job type '%s'", s)
}
func (j JobType) String() string {
return string(j)
}
type daemonJobAdaptor struct {
j Job
}
func (a daemonJobAdaptor) Name() string {
return a.j.JobName()
}
func (a daemonJobAdaptor) Run(ctx context.Context) {
a.j.JobStart(ctx)
}
func (a daemonJobAdaptor) Status() interface{} { return nil }
func doDaemon(cmd *cobra.Command, args []string) {
conf, err := config.ParseConfig(rootArgs.configFile)
if err != nil {
fmt.Fprintf(os.Stderr, "error parsing config: %s\n", err)
os.Exit(1)
}
outlets, err := parseLogging(conf.Global.Logging)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to generate logger: %s\n", err)
return
}
log := logger.NewLogger(outlets.Outlets, 1*time.Second)
ctx := WithLogger(context.Background(), log)
daemonJobs := make([]job.Job, 0, len(conf.Jobs))
for i := range conf.Jobs {
daemonJobs = append(daemonJobs, daemonJobAdaptor{conf.Jobs[i]})
}
daemon.Run(ctx, conf.Global.Control.Sockpath, conf.Global.logging.Outlets, daemonJobs)
}
type contextKey string
const (
contextKeyLog contextKey = contextKey("log")
contextKeyDaemon contextKey = contextKey("daemon")
)
func getLogger(ctx context.Context) Logger {
return ctx.Value(contextKeyLog).(Logger)
}
func WithLogger(ctx context.Context, l Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, l)
}
type Daemon struct {
conf *Config
startedAt time.Time
}
func NewDaemon(initialConf *Config) *Daemon {
return &Daemon{conf: initialConf}
}
func (d *Daemon) Loop(ctx context.Context) {
d.startedAt = time.Now()
log := getLogger(ctx)
ctx, cancel := context.WithCancel(ctx)
ctx = context.WithValue(ctx, contextKeyDaemon, d)
sigChan := make(chan os.Signal, 1)
finishs := make(chan Job)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
log.Info("starting jobs from config")
i := 0
for _, job := range d.conf.Jobs {
logger := log.WithField(logJobField, job.JobName())
logger.Info("starting")
i++
jobCtx := WithLogger(ctx, logger)
go func(j Job) {
j.JobStart(jobCtx)
finishs <- j
}(job)
}
finishCount := 0
outer:
for {
select {
case <-finishs:
finishCount++
if finishCount == len(d.conf.Jobs) {
log.Info("all jobs finished")
break outer
}
case sig := <-sigChan:
log.WithField("signal", sig).Info("received signal")
log.Info("cancelling all jobs")
cancel()
}
}
signal.Stop(sigChan)
cancel() // make go vet happy
log.Info("exiting")
}