From a4da029105db9a06df1be39985ddade15b7ec7a9 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 13 Apr 2018 23:37:49 +0200 Subject: [PATCH] cmd: prometheus job type and Task instrumentation refs #67 --- cmd/config_job_prometheus.go | 100 +++++++++++++++++++++++++++++++++++ cmd/config_parse.go | 2 + cmd/daemon.go | 38 +++++++++++++ 3 files changed, 140 insertions(+) create mode 100644 cmd/config_job_prometheus.go diff --git a/cmd/config_job_prometheus.go b/cmd/config_job_prometheus.go new file mode 100644 index 0000000..662a924 --- /dev/null +++ b/cmd/config_job_prometheus.go @@ -0,0 +1,100 @@ +package cmd + +import ( + "context" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/zrepl/zrepl/zfs" + "net" + "net/http" +) + +type PrometheusJob struct { + Listen string +} + +var prom struct { + taskLastActiveStart *prometheus.GaugeVec + taskLastActiveDuration *prometheus.GaugeVec + taskLogEntries *prometheus.CounterVec +} + +func init() { + prom.taskLastActiveStart = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "zrepl", + Subsystem: "daemon", + Name: "task_last_active_start", + Help: "point in time at which the job task last left idle state", + }, []string{"zrepl_job", "job_type", "task"}) + prom.taskLastActiveDuration = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "zrepl", + Subsystem: "daemon", + Name: "task_last_active_duration", + Help: "seconds that the last run ob a job task spent between leaving and re-entering idle state", + }, []string{"zrepl_job", "job_type", "task"}) + prom.taskLogEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "zrepl", + Subsystem: "daemon", + Name: "task_log_entries", + Help: "number of log entries per job task and level", + }, []string{"zrepl_job", "job_type", "task", "level"}) + prometheus.MustRegister(prom.taskLastActiveStart) + prometheus.MustRegister(prom.taskLastActiveDuration) + prometheus.MustRegister(prom.taskLogEntries) +} + +func parsePrometheusJob(c JobParsingContext, name string, i map[string]interface{}) (j *PrometheusJob, err error) { + var s struct { + Listen string + } + if err := mapstructure.Decode(i, &s); err != nil { + return nil, errors.Wrap(err, "mapstructure error") + } + if s.Listen == "" { + return nil, errors.New("must specify 'listen' attribute") + } + return &PrometheusJob{s.Listen}, nil +} + +func (*PrometheusJob) JobName() string { + return "prometheus" +} + +func (j *PrometheusJob) JobType() JobType { return JobTypePrometheus } + +func (j *PrometheusJob) JobStart(ctx context.Context) { + + if err := zfs.PrometheusRegister(prometheus.DefaultRegisterer); err != nil { + panic(err) + } + + log := ctx.Value(contextKeyLog).(Logger) + task := NewTask("main", j, log) + log = task.Log() + + l, err := net.Listen("tcp", j.Listen) + if err != nil { + log.WithError(err).Error("cannot listen") + } + go func() { + select { + case <-ctx.Done(): + l.Close() + } + }() + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + err = http.Serve(l, mux) + if err != nil { + log.WithError(err).Error("error while serving") + } + +} + +func (*PrometheusJob) JobStatus(ctxt context.Context) (*JobStatus, error) { + return &JobStatus{}, nil +} diff --git a/cmd/config_parse.go b/cmd/config_parse.go index d39f405..ddbbd60 100644 --- a/cmd/config_parse.go +++ b/cmd/config_parse.go @@ -179,6 +179,8 @@ func parseJob(c JobParsingContext, i map[string]interface{}) (j Job, err error) return parseSourceJob(c, name, i) case JobTypeLocal: return parseLocalJob(c, name, i) + case JobTypePrometheus: + return parsePrometheusJob(c, name, i) default: panic(fmt.Sprintf("implementation error: unknown job type %s", jobtype)) } diff --git a/cmd/daemon.go b/cmd/daemon.go index 139d472..46746a2 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -39,6 +39,7 @@ const ( JobTypePull JobType = "pull" JobTypeSource JobType = "source" JobTypeLocal JobType = "local" + JobTypePrometheus JobType = "prometheus" JobTypeControl JobType = "control" ) @@ -50,6 +51,8 @@ func ParseUserJobType(s string) (JobType, error) { return JobTypeSource, nil case "local": return JobTypeLocal, nil + case "prometheus": + return JobTypePrometheus, nil } return "", fmt.Errorf("unknown job type '%s'", s) } @@ -204,6 +207,7 @@ type TaskStatus struct { // An instance of Task tracks a single thread of activity that is part of a Job. type Task struct { + name string // immutable parent Job // immutable // Stack of activities the task is currently in @@ -219,6 +223,7 @@ type Task struct { type taskProgress struct { rx int64 tx int64 + creation time.Time lastUpdate time.Time logEntries []logger.Entry mtx sync.RWMutex @@ -226,6 +231,7 @@ type taskProgress struct { func newTaskProgress() (p *taskProgress) { return &taskProgress{ + creation: time.Now(), logEntries: make([]logger.Entry, 0), } } @@ -250,6 +256,7 @@ func (p *taskProgress) DeepCopy() (out taskProgress) { p.mtx.RLock() defer p.mtx.RUnlock() out.rx, out.tx = p.rx, p.tx + out.creation = p.creation out.lastUpdate = p.lastUpdate out.logEntries = make([]logger.Entry, len(p.logEntries)) for i := range p.logEntries { @@ -280,6 +287,7 @@ type taskActivity struct { func NewTask(name string, parent Job, lg *logger.Logger) *Task { t := &Task{ + name: name, parent: parent, activities: list.New(), } @@ -318,6 +326,13 @@ func (t *Task) Enter(activity string) { // reset progress when leaving idle task // we leave the old progress dangling to have the user not worry about prev.progress = newTaskProgress() + + prom.taskLastActiveStart.WithLabelValues( + t.parent.JobName(), + t.parent.JobType().String(), + t.name). + Set(float64(prev.progress.creation.UnixNano()) / 1e9) + } act := &taskActivity{activity, false, nil, prev.progress} t.activities.PushFront(act) @@ -391,6 +406,21 @@ func (t *Task) Finish() { t.activities.Remove(top) t.activitiesLastUpdate = time.Now() + // prometheus + front := t.activities.Front() + if front != nil && front == t.activities.Back() { + idleAct := front.Value.(*taskActivity) + if !idleAct.idle { + panic("inconsistent implementation") + } + progress := idleAct.progress.Read() + non_idle_time := t.activitiesLastUpdate.Sub(progress.creation) // use same time + prom.taskLastActiveDuration.WithLabelValues( + t.parent.JobName(), + t.parent.JobType().String(), + t.name).Set(non_idle_time.Seconds()) + } + } // Returns a logger derived from the logger passed to the constructor function. @@ -407,6 +437,14 @@ func (t *Task) WriteEntry(entry logger.Entry) error { t.rwl.RLock() defer t.rwl.RUnlock() t.cur().progress.UpdateLogEntry(entry) + + prom.taskLogEntries.WithLabelValues( + t.parent.JobName(), + t.parent.JobType().String(), + t.name, + entry.Level.String()). + Inc() + return nil }